Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB57910C83 for ; Wed, 5 Jun 2013 17:19:54 +0000 (UTC) Received: (qmail 34114 invoked by uid 500); 5 Jun 2013 17:19:54 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 34082 invoked by uid 500); 5 Jun 2013 17:19:54 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 33817 invoked by uid 99); 5 Jun 2013 17:19:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Jun 2013 17:19:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Jun 2013 17:19:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D5ABB23888E3; Wed, 5 Jun 2013 17:19:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1489969 [2/4] - in /accumulo/branches/ACCUMULO-118: core/src/main/java/org/apache/accumulo/core/util/ proxy/src/test/java/org/apache/accumulo/proxy/ server/src/main/java/org/apache/accumulo/server/ server/src/main/java/org/apache/accumulo/... Date: Wed, 05 Jun 2013 17:19:26 -0000 To: commits@accumulo.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130605171927.D5ABB23888E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1489969&r1=1489968&r2=1489969&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Wed Jun 5 17:19:25 2013 @@ -63,7 +63,6 @@ import org.apache.accumulo.core.data.thr import org.apache.accumulo.core.data.thrift.MapFileInfo; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.FileUtil; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; @@ -91,9 +90,11 @@ import org.apache.accumulo.core.util.Uti import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; -import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.constraints.ConstraintChecker; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.FileSystem; +import org.apache.accumulo.server.fs.FileSystemImpl; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.tableOps.CompactRange.CompactionIterators; import org.apache.accumulo.server.problems.ProblemReport; @@ -111,7 +112,7 @@ import org.apache.accumulo.server.tablet import org.apache.accumulo.server.tabletserver.log.MutationReceiver; import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage; import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics; -import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.accumulo.server.util.FileUtil; import org.apache.accumulo.server.util.MapCounter; import org.apache.accumulo.server.util.MetadataTable; import org.apache.accumulo.server.util.MetadataTable.LogEntry; @@ -124,9 +125,7 @@ import org.apache.commons.codec.DecoderE import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Trash; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; @@ -474,10 +473,10 @@ public class Tablet { private static final long serialVersionUID = 1L; } - String getNextMapFilename(String prefix) throws IOException { + FileRef getNextMapFilename(String prefix) throws IOException { String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent)); checkTabletDir(); - return location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension; + return new FileRef(location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension); } private void checkTabletDir() throws IOException { @@ -506,32 +505,25 @@ public class Tablet { } } - private static String rel2abs(String relPath, KeyExtent extent) { - if (relPath.startsWith("../")) - return ServerConstants.getTablesDir() + relPath.substring(2); - else - return ServerConstants.getTablesDir() + "/" + extent.getTableId() + relPath; - } - class DatafileManager { // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles - final private Map datafileSizes = Collections.synchronizedMap(new TreeMap()); + final private Map datafileSizes = Collections.synchronizedMap(new TreeMap()); - DatafileManager(SortedMap datafileSizes) { - for (Entry datafiles : datafileSizes.entrySet()) - this.datafileSizes.put(new Path(rel2abs(datafiles.getKey(), extent)), datafiles.getValue()); + DatafileManager(SortedMap datafileSizes) { + for (Entry datafiles : datafileSizes.entrySet()) + this.datafileSizes.put(datafiles.getKey(), datafiles.getValue()); } - Path mergingMinorCompactionFile = null; - Set filesToDeleteAfterScan = new HashSet(); - Map> scanFileReservations = new HashMap>(); - MapCounter fileScanReferenceCounts = new MapCounter(); + FileRef mergingMinorCompactionFile = null; + Set filesToDeleteAfterScan = new HashSet(); + Map> scanFileReservations = new HashMap>(); + MapCounter fileScanReferenceCounts = new MapCounter(); long nextScanReservationId = 0; boolean reservationsBlocked = false; - Set majorCompactingFiles = new HashSet(); + Set majorCompactingFiles = new HashSet(); - Pair> reserveFilesForScan() { + Pair> reserveFilesForScan() { synchronized (Tablet.this) { while (reservationsBlocked) { @@ -542,35 +534,35 @@ public class Tablet { } } - Set absFilePaths = new HashSet(datafileSizes.keySet()); + Set absFilePaths = new HashSet(datafileSizes.keySet()); long rid = nextScanReservationId++; scanFileReservations.put(rid, absFilePaths); - Map ret = new HashMap(); + Map ret = new HashMap(); - for (Path path : absFilePaths) { + for (FileRef path : absFilePaths) { fileScanReferenceCounts.increment(path, 1); - ret.put(path.toString(), datafileSizes.get(path)); + ret.put(path, datafileSizes.get(path)); } - return new Pair>(rid, ret); + return new Pair>(rid, ret); } } void returnFilesForScan(Long reservationId) { - final Set filesToDelete = new HashSet(); + final Set filesToDelete = new HashSet(); synchronized (Tablet.this) { - Set absFilePaths = scanFileReservations.remove(reservationId); + Set absFilePaths = scanFileReservations.remove(reservationId); if (absFilePaths == null) throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); boolean notify = false; - for (Path path : absFilePaths) { + for (FileRef path : absFilePaths) { long refCount = fileScanReferenceCounts.decrement(path, 1); if (refCount == 0) { if (filesToDeleteAfterScan.remove(path)) @@ -585,28 +577,19 @@ public class Tablet { } if (filesToDelete.size() > 0) { - log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete)); - MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock()); + log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete); + MetadataTable.removeScanFiles(extent, filesToDelete, SecurityConstants.getSystemCredentials(), tabletServer.getLock()); } } - private void removeFilesAfterScanRel(Set relPaths) { - Set scanFiles = new HashSet(); - - for (String rpath : relPaths) - scanFiles.add(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + rpath)); - - removeFilesAfterScan(scanFiles); - } - - private void removeFilesAfterScan(Set scanFiles) { + private void removeFilesAfterScan(Set scanFiles) { if (scanFiles.size() == 0) return; - Set filesToDelete = new HashSet(); + Set filesToDelete = new HashSet(); synchronized (Tablet.this) { - for (Path path : scanFiles) { + for (FileRef path : scanFiles) { if (fileScanReferenceCounts.get(path) == 0) filesToDelete.add(path); else @@ -615,14 +598,14 @@ public class Tablet { } if (filesToDelete.size() > 0) { - log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete)); - MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock()); + log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete); + MetadataTable.removeScanFiles(extent, filesToDelete, SecurityConstants.getSystemCredentials(), tabletServer.getLock()); } } - private TreeSet waitForScansToFinish(Set pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { + private TreeSet waitForScansToFinish(Set pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { long startTime = System.currentTimeMillis(); - TreeSet inUse = new TreeSet(); + TreeSet inUse = new TreeSet(); Span waitForScans = Trace.start("waitForScans"); synchronized (Tablet.this) { @@ -633,7 +616,7 @@ public class Tablet { reservationsBlocked = true; } - for (Path path : pathsToWaitFor) { + for (FileRef path : pathsToWaitFor) { while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) { try { Tablet.this.wait(100); @@ -643,7 +626,7 @@ public class Tablet { } } - for (Path path : pathsToWaitFor) { + for (FileRef path : pathsToWaitFor) { if (fileScanReferenceCounts.get(path) > 0) inUse.add(path); } @@ -658,23 +641,24 @@ public class Tablet { return inUse; } - public void importMapFiles(long tid, Map pathsString, boolean setTime) throws IOException { + public void importMapFiles(long tid, Map pathsString, boolean setTime) throws IOException { String bulkDir = null; - Map paths = new HashMap(); - for (Entry entry : pathsString.entrySet()) - paths.put(new Path(entry.getKey()), entry.getValue()); - - for (Path tpath : paths.keySet()) { - - if (!tpath.getParent().getParent().equals(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId()))) { - throw new IOException("Map file " + tpath + " not in table dir " + ServerConstants.getTablesDir() + "/" + extent.getTableId()); - } + Map paths = new HashMap(); + for (Entry entry : pathsString.entrySet()) + paths.put(entry.getKey(), entry.getValue()); + + for (FileRef tpath : paths.keySet()) { + + // TODO ACCUMULO-118 +// if (!tpath.getParent().getParent().equals(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId()))) { +// throw new IOException("Map file " + tpath + " not in table dir " + ServerConstants.getTablesDir() + "/" + extent.getTableId()); +// } if (bulkDir == null) - bulkDir = tpath.getParent().toString(); - else if (!bulkDir.equals(tpath.getParent().toString())) + bulkDir = tpath.path().getParent().toString(); + else if (!bulkDir.equals(tpath.path().getParent().toString())) throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath); } @@ -692,9 +676,10 @@ public class Tablet { throw new IOException(ex); } // Remove any bulk files we've previously loaded and compacted away - List files = MetadataTable.getBulkFilesLoaded(conn, extent, tid); - for (String file : files) - if (paths.keySet().remove(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + file))) + List files = MetadataTable.getBulkFilesLoaded(conn, extent, tid); + + for (FileRef file : files) + if (paths.keySet().remove(file.path())) log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file); if (paths.size() > 0) { @@ -713,13 +698,13 @@ public class Tablet { if (bulkTime > persistedTime) persistedTime = bulkTime; - MetadataTable.updateTabletDataFile(tid, extent, abs2rel(paths), tabletTime.getMetadataValue(persistedTime), auths, tabletServer.getLock()); + MetadataTable.updateTabletDataFile(tid, extent, paths, tabletTime.getMetadataValue(persistedTime), auths, tabletServer.getLock()); } } } synchronized (Tablet.this) { - for (Entry tpath : paths.entrySet()) { + for (Entry tpath : paths.entrySet()) { if (datafileSizes.containsKey(tpath.getKey())) { log.error("Adding file that is already in set " + tpath.getKey()); } @@ -732,12 +717,12 @@ public class Tablet { computeNumEntries(); } - for (Path tpath : paths.keySet()) { - log.log(TLevel.TABLET_HIST, extent + " import " + abs2rel(tpath) + " " + paths.get(tpath)); + for (FileRef tpath : paths.keySet()) { + log.log(TLevel.TABLET_HIST, extent + " import " + tpath + " " + paths.get(tpath)); } } - String reserveMergingMinorCompactionFile() { + FileRef reserveMergingMinorCompactionFile() { if (mergingMinorCompactionFile != null) throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile); @@ -759,9 +744,9 @@ public class Tablet { // find the smallest file long min = Long.MAX_VALUE; - Path minName = null; + FileRef minName = null; - for (Entry entry : datafileSizes.entrySet()) { + for (Entry entry : datafileSizes.entrySet()) { if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) { min = entry.getValue().getSize(); minName = entry.getKey(); @@ -772,13 +757,13 @@ public class Tablet { return null; mergingMinorCompactionFile = minName; - return minName.toString(); + return minName; } return null; } - void unreserveMergingMinorCompactionFile(Path file) { + void unreserveMergingMinorCompactionFile(FileRef file) { if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null) || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile))) throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile); @@ -786,12 +771,7 @@ public class Tablet { mergingMinorCompactionFile = null; } - void bringMinorCompactionOnline(String tmpDatafile, String newDatafile, String absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { - bringMinorCompactionOnline(new Path(tmpDatafile), new Path(newDatafile), absMergeFile == null ? null : new Path(absMergeFile), dfv, commitSession, - flushId); - } - - void bringMinorCompactionOnline(Path tmpDatafile, Path newDatafile, Path absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { + void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); if (extent.isRootTablet()) { @@ -809,20 +789,20 @@ public class Tablet { do { try { if (dfv.getNumEntries() == 0) { - fs.delete(tmpDatafile, true); + fs.deleteRecursively(tmpDatafile.path()); } else { - if (fs.exists(newDatafile)) { + if (fs.exists(newDatafile.path())) { log.warn("Target map file already exist " + newDatafile); - fs.delete(newDatafile, true); + fs.deleteRecursively(newDatafile.path()); } - if (!fs.rename(tmpDatafile, newDatafile)) { + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) { throw new IOException("rename fails"); } } break; } catch (IOException ioe) { - log.warn("Tablet " + extent + " failed to rename " + abs2rel(newDatafile) + " after MinC, will retry in 60 secs...", ioe); + log.warn("Tablet " + extent + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe); UtilWaitThread.sleep(60 * 1000); } } while (true); @@ -838,14 +818,14 @@ public class Tablet { // here, but that was incorrect because a scan could start after waiting but before // memory was updated... assuming the file is always in use by scans leads to // one uneeded metadata update when it was not actually in use - Set filesInUseByScans = Collections.emptySet(); + Set filesInUseByScans = Collections.emptySet(); if (absMergeFile != null) filesInUseByScans = Collections.singleton(absMergeFile); // very important to write delete entries outside of log lock, because // this !METADATA write does not go up... it goes sideways or to itself if (absMergeFile != null) - MetadataTable.addDeleteEntries(extent, Collections.singleton(abs2rel(absMergeFile)), SecurityConstants.getSystemCredentials()); + MetadataTable.addDeleteEntries(extent, Collections.singleton(absMergeFile), SecurityConstants.getSystemCredentials()); Set unusedWalLogs = beginClearingUnusedLogs(); try { @@ -860,7 +840,7 @@ public class Tablet { persistedTime = commitSession.getMaxCommittedTime(); String time = tabletTime.getMetadataValue(persistedTime); - MetadataTable.updateTabletDataFile(extent, abs2rel(newDatafile), abs2rel(absMergeFile), dfv, time, creds, abs2rel(filesInUseByScans), + MetadataTable.updateTabletDataFile(extent, newDatafile, absMergeFile, dfv, time, creds, filesInUseByScans, tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId); } @@ -912,9 +892,9 @@ public class Tablet { removeFilesAfterScan(filesInUseByScans); if (absMergeFile != null) - log.log(TLevel.TABLET_HIST, extent + " MinC [" + abs2rel(absMergeFile) + ",memory] -> " + abs2rel(newDatafile)); + log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile + ",memory] -> " + newDatafile); else - log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + abs2rel(newDatafile)); + log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + newDatafile); log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, getExtent().toString())); if (dfv.getSize() > acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) { log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d", @@ -923,77 +903,37 @@ public class Tablet { } - private Map abs2rel(Map paths) { - TreeMap relMap = new TreeMap(); - - for (Entry entry : paths.entrySet()) - relMap.put(abs2rel(entry.getKey()), entry.getValue()); - - return relMap; - } - - private Set abs2rel(Set absPaths) { - Set relativePaths = new TreeSet(); - for (Path absPath : absPaths) - relativePaths.add(abs2rel(absPath)); - - return relativePaths; - } - - private Set string2path(Set strings) { - Set paths = new HashSet(); - for (String path : strings) - paths.add(new Path(path)); - - return paths; - } - - private String abs2rel(Path absPath) { - if (absPath == null) - return null; - - if (absPath.getParent().getParent().getName().equals(extent.getTableId().toString())) - return "/" + absPath.getParent().getName() + "/" + absPath.getName(); - else - return "../" + absPath.getParent().getParent().getName() + "/" + absPath.getParent().getName() + "/" + absPath.getName(); - } - - public void reserveMajorCompactingFiles(Set files) { + public void reserveMajorCompactingFiles(Set files) { if (majorCompactingFiles.size() != 0) throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles); - Set mcf = string2path(files); - if (mergingMinorCompactionFile != null && mcf.contains(mergingMinorCompactionFile)) + if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile)) throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile); - majorCompactingFiles.addAll(mcf); + majorCompactingFiles.addAll(files); } public void clearMajorCompactingFile() { majorCompactingFiles.clear(); } - void bringMajorCompactionOnline(Set oldDatafiles, String tmpDatafile, String newDatafile, Long compactionId, DataFileValue dfv) throws IOException { - bringMajorCompactionOnline(string2path(oldDatafiles), new Path(tmpDatafile), new Path(newDatafile), compactionId, dfv); - } - - void bringMajorCompactionOnline(Set oldDatafiles, Path tmpDatafile, Path newDatafile, Long compactionId, DataFileValue dfv) throws IOException { + void bringMajorCompactionOnline(Set oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv) throws IOException { long t1, t2; if (!extent.isRootTablet()) { - if (fs.exists(newDatafile)) { + if (fs.exists(newDatafile.path())) { log.error("Target map file already exist " + newDatafile, new Exception()); throw new IllegalStateException("Target map file already exist " + newDatafile); } // rename before putting in metadata table, so files in metadata table should // always exist - if (!fs.rename(tmpDatafile, newDatafile)) + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) log.warn("Rename of " + tmpDatafile + " to " + newDatafile + " returned false"); if (dfv.getNumEntries() == 0) { - fs.delete(newDatafile, true); + fs.deleteRecursively(newDatafile.path()); } } @@ -1023,32 +963,33 @@ public class Tablet { // rename the compacted map file, in case // the system goes down - String compactName = newDatafile.getName(); + String compactName = newDatafile.path().getName(); - for (Path path : oldDatafiles) { + for (FileRef ref : oldDatafiles) { + Path path = ref.path(); fs.rename(path, new Path(location + "/delete+" + compactName + "+" + path.getName())); } - if (fs.exists(newDatafile)) { + if (fs.exists(newDatafile.path())) { log.error("Target map file already exist " + newDatafile, new Exception()); throw new IllegalStateException("Target map file already exist " + newDatafile); } - if (!fs.rename(tmpDatafile, newDatafile)) + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) log.warn("Rename of " + tmpDatafile + " to " + newDatafile + " returned false"); // start deleting files, if we do not finish they will be cleaned // up later - Trash trash = new Trash(fs, fs.getConf()); - for (Path path : oldDatafiles) { + for (FileRef ref : oldDatafiles) { + Path path = ref.path(); Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName()); - if (!trash.moveToTrash(deleteFile)) - fs.delete(deleteFile, true); + if (!fs.moveToTrash(deleteFile)) + fs.deleteRecursively(deleteFile); } } // atomically remove old files and add new file - for (Path oldDatafile : oldDatafiles) { + for (FileRef oldDatafile : oldDatafiles) { if (!datafileSizes.containsKey(oldDatafile)) { log.error("file does not exist in set " + oldDatafile); } @@ -1079,50 +1020,28 @@ public class Tablet { } if (!extent.isRootTablet()) { - Set filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000); + Set filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000); if (filesInUseByScans.size() > 0) - log.debug("Adding scan refs to metadata " + extent + " " + abs2rel(filesInUseByScans)); - MetadataTable.replaceDatafiles(extent, abs2rel(oldDatafiles), abs2rel(filesInUseByScans), abs2rel(newDatafile), compactionId, dfv, + log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans); + MetadataTable.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SecurityConstants.getSystemCredentials(), tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock()); removeFilesAfterScan(filesInUseByScans); } log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0)); - log.log(TLevel.TABLET_HIST, extent + " MajC " + abs2rel(oldDatafiles) + " --> " + abs2rel(newDatafile)); + log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile); } - public SortedMap getDatafileSizesRel() { + public SortedMap getDatafileSizes() { synchronized (Tablet.this) { - TreeMap files = new TreeMap(); - Set> es = datafileSizes.entrySet(); - - for (Entry entry : es) { - files.put(abs2rel(entry.getKey()), entry.getValue()); - } - - return Collections.unmodifiableSortedMap(files); - } - } - - public SortedMap getDatafileSizes() { - synchronized (Tablet.this) { - TreeMap files = new TreeMap(); - Set> es = datafileSizes.entrySet(); - - for (Entry entry : es) { - files.put(entry.getKey().toString(), entry.getValue()); - } - - return Collections.unmodifiableSortedMap(files); + TreeMap copy = new TreeMap(datafileSizes); + return Collections.unmodifiableSortedMap(copy); } } - public Set getFiles() { + public Set getFiles() { synchronized (Tablet.this) { - HashSet files = new HashSet(); - for (Path path : datafileSizes.keySet()) { - files.add(path.toString()); - } + HashSet files = new HashSet(datafileSizes.keySet()); return Collections.unmodifiableSet(files); } } @@ -1135,7 +1054,7 @@ public class Tablet { splitCreationTime = 0; } - public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap datafiles, String time, + public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap datafiles, String time, long initFlushID, long initCompactID) throws IOException { this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), datafiles, time, initFlushID, initCompactID); splitCreationTime = System.currentTimeMillis(); @@ -1143,16 +1062,16 @@ public class Tablet { private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, SortedMap tabletsKeyValues) throws IOException { - this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), + this(tabletServer, location, extent, trm, conf, FileSystemImpl.get(), tabletsKeyValues); } static private final List EMPTY = Collections.emptyList(); private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, - SortedMap datafiles, String time, long initFlushID, long initCompactID) throws IOException { - this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), EMPTY, - datafiles, time, null, new HashSet(), initFlushID, initCompactID); + SortedMap datafiles, String time, long initFlushID, long initCompactID) throws IOException { + this(tabletServer, location, extent, trm, conf, FileSystemImpl.get(), EMPTY, + datafiles, time, null, new HashSet(), initFlushID, initCompactID); } private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap tabletsKeyValues) { @@ -1177,29 +1096,26 @@ public class Tablet { return null; } - private static SortedMap lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent, + private static SortedMap lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent, SortedMap tabletsKeyValues) throws IOException { - Path location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + locText.toString()); - TreeMap datafiles = new TreeMap(); + TreeMap datafiles = new TreeMap(); if (extent.isRootTablet()) { // the meta0 tablet + Path location = new Path(ServerConstants.getRootTabletDir()); + location = location.makeQualified(fs.getDefaultNamespace()); // cleanUpFiles() has special handling for delete. files FileStatus[] files = fs.listStatus(location); - Path[] paths = new Path[files.length]; - for (int i = 0; i < files.length; i++) { - paths[i] = files[i].getPath(); - } - Collection goodPaths = cleanUpFiles(fs, files, location, true); - for (String path : goodPaths) { - String filename = new Path(path).getName(); + Collection goodPaths = cleanUpFiles(fs, files, true); + for (String good : goodPaths) { + Path path = new Path(good); + String filename = path.getName(); + FileRef ref = new FileRef(location.toString() + "/" + filename, path); DataFileValue dfv = new DataFileValue(0, 0); - datafiles.put(locText.toString() + "/" + filename, dfv); + datafiles.put(ref, dfv); } } else { - SortedMap datafilesMetadata; - Text rowName = extent.getMetadataEntry(); ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID, @@ -1215,23 +1131,14 @@ public class Tablet { mdScanner.setRange(new Range(rowName)); - datafilesMetadata = new TreeMap(); - for (Entry entry : mdScanner) { if (entry.getKey().compareRow(rowName) != 0) { break; } - datafilesMetadata.put(new Key(entry.getKey()), new Value(entry.getValue())); - } - - Iterator> dfmdIter = datafilesMetadata.entrySet().iterator(); - - while (dfmdIter.hasNext()) { - Entry entry = dfmdIter.next(); - - datafiles.put(entry.getKey().getColumnQualifier().toString(), new DataFileValue(entry.getValue().get())); + FileRef ref = new FileRef(entry.getKey().getColumnQualifier().toString(), new Path(fs.getFullPath(entry.getKey()))); + datafiles.put(ref, new DataFileValue(entry.getValue().get())); } } return datafiles; @@ -1263,14 +1170,16 @@ public class Tablet { return logEntries; } - private static Set lookupScanFiles(KeyExtent extent, SortedMap tabletsKeyValues) { - HashSet scanFiles = new HashSet(); + private static Set lookupScanFiles(KeyExtent extent, SortedMap tabletsKeyValues, FileSystem fs) throws IOException { + HashSet scanFiles = new HashSet(); Text row = extent.getMetadataEntry(); for (Entry entry : tabletsKeyValues.entrySet()) { Key key = entry.getKey(); if (key.getRow().equals(row) && key.getColumnFamily().equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) { - scanFiles.add(key.getColumnQualifier().toString()); + String meta = key.getColumnQualifier().toString(); + String path = fs.getFullPath(ServerConstants.getTablesDirs(), meta); + scanFiles.add(new FileRef(meta, new Path(path))); } } @@ -1303,7 +1212,7 @@ public class Tablet { SortedMap tabletsKeyValues) throws IOException { this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(), location, fs, extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent, - tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues)); + tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues, fs), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues)); } private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap tabletsKeyValues) { @@ -1319,9 +1228,14 @@ public class Tablet { * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time */ private Tablet(final TabletServer tabletServer, final Text location, final KeyExtent extent, final TabletResourceManager trm, final Configuration conf, - final FileSystem fs, final List logEntries, final SortedMap datafiles, String time, final TServerInstance lastLocation, - Set scanFiles, long initFlushID, long initCompactID) throws IOException { - this.location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + location.toString()); + final FileSystem fs, final List logEntries, final SortedMap datafiles, String time, final TServerInstance lastLocation, + Set scanFiles, long initFlushID, long initCompactID) throws IOException { + if (location.find(":") >= 0) { + this.location = new Path(location.toString()); + } else { + this.location = new Path(ServerConstants.getTablesDirs()[0] + "/" + extent.getTableId().toString() + location.toString()); + } + this.location = this.location.makeQualified(fs.getFileSystemByPath(this.location)); this.lastLocation = lastLocation; this.tabletDirectory = location.toString(); this.conf = conf; @@ -1337,10 +1251,10 @@ public class Tablet { if (extent.isRootTablet()) { long rtime = Long.MIN_VALUE; - for (String path : datafiles.keySet()) { - String filename = new Path(path).getName(); - - FileSKVIterator reader = FileOperations.getInstance().openReader(this.location + "/" + filename, true, fs, fs.getConf(), + for (FileRef ref : datafiles.keySet()) { + Path path = ref.path(); + org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path); + FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), tabletServer.getTableConfiguration(extent)); long maxTime = -1; try { @@ -1419,8 +1333,8 @@ public class Tablet { count[1] = Long.MIN_VALUE; try { Set absPaths = new HashSet(); - for (String relPath : datafiles.keySet()) - absPaths.add(rel2abs(relPath, extent)); + for (FileRef ref : datafiles.keySet()) + absPaths.add(ref.path().toString()); tabletServer.recover(this.tabletServer.getFileSystem(), this, logEntries, absPaths, new MutationReceiver() { public void receive(Mutation m) { @@ -1483,7 +1397,7 @@ public class Tablet { computeNumEntries(); - datafileManager.removeFilesAfterScanRel(scanFiles); + datafileManager.removeFilesAfterScan(scanFiles); log.log(TLevel.TABLET_HIST, extent + " opened "); } @@ -1502,7 +1416,7 @@ public class Tablet { } } - private static Collection cleanUpFiles(FileSystem fs, FileStatus[] files, Path location, boolean deleteTmp) throws IOException { + private static Collection cleanUpFiles(FileSystem fs, FileStatus[] files, boolean deleteTmp) throws IOException { /* * called in constructor and before major compactions */ @@ -1516,10 +1430,10 @@ public class Tablet { // check for incomplete major compaction, this should only occur // for root tablet if (filename.startsWith("delete+")) { - String expectedCompactedFile = location.toString() + "/" + filename.split("\\+")[1]; + String expectedCompactedFile = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1]; if (fs.exists(new Path(expectedCompactedFile))) { // compaction finished, but did not finish deleting compacted files.. so delete it - if (!fs.delete(file.getPath(), true)) + if (!fs.deleteRecursively(file.getPath())) log.warn("Delete of file: " + file.getPath().toString() + " return false"); continue; } @@ -1527,7 +1441,7 @@ public class Tablet { // reset path and filename for rest of loop filename = filename.split("\\+", 3)[2]; - path = location + "/" + filename; + path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename; if (!fs.rename(file.getPath(), new Path(path))) log.warn("Rename of " + file.getPath().toString() + " to " + path + " returned false"); @@ -1536,7 +1450,7 @@ public class Tablet { if (filename.endsWith("_tmp")) { if (deleteTmp) { log.warn("cleaning up old tmp file: " + path); - if (!fs.delete(file.getPath(), true)) + if (!fs.deleteRecursively(file.getPath())) log.warn("Delete of tmp file: " + file.getPath().toString() + " return false"); } @@ -2025,7 +1939,7 @@ public class Tablet { private SortedKeyValueIterator createIterator() throws IOException { - Map files; + Map files; synchronized (Tablet.this) { @@ -2052,7 +1966,7 @@ public class Tablet { expectedDeletionCount = dataSourceDeletions.get(); memIters = tabletMemory.getIterators(); - Pair> reservation = datafileManager.reserveFilesForScan(); + Pair> reservation = datafileManager.reserveFilesForScan(); fileReservationId = reservation.getFirst(); files = reservation.getSecond(); } @@ -2122,7 +2036,7 @@ public class Tablet { } - private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable, String tmpDatafile, String newDatafile, String mergeFile, + private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable, FileRef tmpDatafile, FileRef newDatafile, FileRef mergeFile, boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { boolean failed = false; long start = System.currentTimeMillis(); @@ -2180,11 +2094,11 @@ public class Tablet { private long queued; private CommitSession commitSession; private DataFileValue stats; - private String mergeFile; + private FileRef mergeFile; private long flushId; private MinorCompactionReason mincReason; - MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { + MinorCompactionTask(FileRef mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { queued = System.currentTimeMillis(); minorCompactionWaitingToStart = true; this.commitSession = commitSession; @@ -2198,7 +2112,8 @@ public class Tablet { minorCompactionInProgress = true; Span minorCompaction = Trace.on("minorCompaction"); try { - String newMapfileLocation = getNextMapFilename(mergeFile == null ? "F" : "M"); + FileRef newMapfileLocation = getNextMapFilename(mergeFile == null ? "F" : "M"); + FileRef tmpFileRef = new FileRef(newMapfileLocation.path() + "_tmp"); Span span = Trace.start("waitForCommits"); synchronized (Tablet.this) { commitSession.waitForCommitsToFinish(); @@ -2212,7 +2127,7 @@ public class Tablet { // writing the minor compaction finish event, then the start event+filename in metadata table will // prevent recovery of duplicate data... the minor compaction start event could be written at any time // before the metadata write for the minor compaction - tabletServer.minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation); + tabletServer.minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString()); break; } catch (IOException e) { log.warn("Failed to write to write ahead log " + e.getMessage(), e); @@ -2220,7 +2135,7 @@ public class Tablet { } span.stop(); span = Trace.start("compact"); - this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), newMapfileLocation + "_tmp", newMapfileLocation, mergeFile, true, queued, + this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId, mincReason); span.stop(); @@ -2247,7 +2162,7 @@ public class Tablet { otherLogs = currentLogs; currentLogs = new HashSet(); - String mergeFile = datafileManager.reserveMergingMinorCompactionFile(); + FileRef mergeFile = datafileManager.reserveMergingMinorCompactionFile(); return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason); @@ -2797,7 +2712,7 @@ public class Tablet { } try { - Pair,SortedMap> fileLog = MetadataTable.getFileAndLogEntries(SecurityConstants.getSystemCredentials(), extent); + Pair,SortedMap> fileLog = MetadataTable.getFileAndLogEntries(SecurityConstants.getSystemCredentials(), extent); if (fileLog.getFirst().size() != 0) { String msg = "Closed tablet " + extent + " has walog entries in !METADATA " + fileLog.getFirst(); @@ -2806,16 +2721,16 @@ public class Tablet { } if (extent.isRootTablet()) { - if (!fileLog.getSecond().keySet().equals(datafileManager.getDatafileSizesRel().keySet())) { + if (!fileLog.getSecond().keySet().equals(datafileManager.getDatafileSizes().keySet())) { String msg = "Data file in !METADATA differ from in memory data " + extent + " " + fileLog.getSecond().keySet() + " " - + datafileManager.getDatafileSizesRel().keySet(); + + datafileManager.getDatafileSizes().keySet(); log.error(msg); throw new RuntimeException(msg); } } else { - if (!fileLog.getSecond().equals(datafileManager.getDatafileSizesRel())) { + if (!fileLog.getSecond().equals(datafileManager.getDatafileSizes())) { String msg = "Data file in !METADATA differ from in memory data " + extent + " " + fileLog.getSecond() + " " - + datafileManager.getDatafileSizesRel(); + + datafileManager.getDatafileSizes(); log.error(msg); throw new RuntimeException(msg); } @@ -2944,15 +2859,15 @@ public class Tablet { } private class CompactionTuple { - private Map filesToCompact; + private Map filesToCompact; private boolean compactAll; - public CompactionTuple(Map filesToCompact, boolean doAll) { + public CompactionTuple(Map filesToCompact, boolean doAll) { this.filesToCompact = filesToCompact; compactAll = doAll; } - public Map getFilesToCompact() { + public Map getFilesToCompact() { return filesToCompact; } @@ -2965,10 +2880,10 @@ public class Tablet { * Returns list of files that need to be compacted by major compactor */ - private CompactionTuple getFilesToCompact(MajorCompactionReason reason, Map> falks) { - SortedMap files = datafileManager.getDatafileSizes(); + private CompactionTuple getFilesToCompact(MajorCompactionReason reason, Map> falks) { + SortedMap files = datafileManager.getDatafileSizes(); - Map toCompact; + Map toCompact; if (reason == MajorCompactionReason.CHOP) { toCompact = findChopFiles(files, falks); } else { @@ -2979,14 +2894,15 @@ public class Tablet { return new CompactionTuple(toCompact, toCompact.size() == files.size()); } - private Map> getFirstAndLastKeys(SortedMap files) throws IOException { + private Map> getFirstAndLastKeys(SortedMap files) throws IOException { FileOperations fileFactory = FileOperations.getInstance(); - Map> falks = new HashMap>(); + Map> falks = new HashMap>(); - for (Entry entry : files.entrySet()) { - String file = entry.getKey(); - FileSKVIterator openReader = fileFactory.openReader(file, true, fs, conf, acuTableConf); + for (Entry entry : files.entrySet()) { + FileRef file = entry.getKey(); + org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(file.path()); + FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), acuTableConf); try { Key first = openReader.getFirstKey(); Key last = openReader.getLastKey(); @@ -2998,12 +2914,12 @@ public class Tablet { return falks; } - private Map findChopFiles(SortedMap files, Map> falks) { + private Map findChopFiles(SortedMap files, Map> falks) { - Map result = new HashMap(); + Map result = new HashMap(); - for (Entry entry : files.entrySet()) { - String file = entry.getKey(); + for (Entry entry : files.entrySet()) { + FileRef file = entry.getKey(); Pair pair = falks.get(file); if (pair == null) { @@ -3056,14 +2972,14 @@ public class Tablet { } } - private SplitRowSpec findSplitRow(Collection files) { + private SplitRowSpec findSplitRow(Collection files) { // never split the root tablet // check if we already decided that we can never split // check to see if we're big enough to split long splitThreshold = acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD); - if (location.toString().equals(ServerConstants.getRootTabletDir()) || estimateTabletSize() <= splitThreshold) { + if (extent.isRootTablet() || estimateTabletSize() <= splitThreshold) { return null; } @@ -3174,11 +3090,11 @@ public class Tablet { long t1, t2, t3; // acquire first and last key info outside of tablet lock - Map> falks = null; + Map> falks = null; if (reason == MajorCompactionReason.CHOP) falks = getFirstAndLastKeys(datafileManager.getDatafileSizes()); - Map filesToCompact; + Map filesToCompact; int maxFilesToCompact = acuTableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN); @@ -3204,7 +3120,7 @@ public class Tablet { // otherwise deleted compacted files could possible be brought back // at some point if the file they were compacted to was legitimately // removed by a major compaction - cleanUpFiles(fs, fs.listStatus(this.location), this.location, false); + cleanUpFiles(fs, fs.listStatus(this.location), false); } // getFilesToCompact() and cleanUpFiles() both @@ -3272,10 +3188,10 @@ public class Tablet { numToCompact = filesToCompact.size() - maxFilesToCompact + 1; } - Set smallestFiles = removeSmallest(filesToCompact, numToCompact); + Set smallestFiles = removeSmallest(filesToCompact, numToCompact); - String fileName = getNextMapFilename((filesToCompact.size() == 0 && !propogateDeletes) ? "A" : "C"); - String compactTmpName = fileName + "_tmp"; + FileRef fileName = getNextMapFilename((filesToCompact.size() == 0 && !propogateDeletes) ? "A" : "C"); + FileRef compactTmpName = new FileRef(fileName.path().toString() + "_tmp"); Span span = Trace.start("compactFiles"); try { @@ -3292,14 +3208,13 @@ public class Tablet { } }; - HashMap copy = new HashMap(datafileManager.getDatafileSizes()); + HashMap copy = new HashMap(datafileManager.getDatafileSizes()); if (!copy.keySet().containsAll(smallestFiles)) throw new IllegalStateException("Cannot find data file values for " + smallestFiles); copy.keySet().retainAll(smallestFiles); - log.debug("Starting MajC " + extent + " (" + reason + ") " + datafileManager.abs2rel(datafileManager.string2path(copy.keySet())) + " --> " - + datafileManager.abs2rel(new Path(compactTmpName)) + " " + compactionIterators); + log.debug("Starting MajC " + extent + " (" + reason + ") " + copy.keySet() + " --> " + compactTmpName + " " + compactionIterators); // always propagate deletes, unless last batch Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, acuTableConf, extent, @@ -3335,12 +3250,12 @@ public class Tablet { } } - private Set removeSmallest(Map filesToCompact, int maxFilesToCompact) { + private Set removeSmallest(Map filesToCompact, int maxFilesToCompact) { // ensure this method works properly when multiple files have the same size - PriorityQueue> fileHeap = new PriorityQueue>(filesToCompact.size(), new Comparator>() { + PriorityQueue> fileHeap = new PriorityQueue>(filesToCompact.size(), new Comparator>() { @Override - public int compare(Pair o1, Pair o2) { + public int compare(Pair o1, Pair o2) { if (o1.getSecond() == o2.getSecond()) return o1.getFirst().compareTo(o2.getFirst()); if (o1.getSecond() < o2.getSecond()) @@ -3349,14 +3264,14 @@ public class Tablet { } }); - for (Iterator> iterator = filesToCompact.entrySet().iterator(); iterator.hasNext();) { - Entry entry = (Entry) iterator.next(); - fileHeap.add(new Pair(entry.getKey(), entry.getValue())); + for (Iterator> iterator = filesToCompact.entrySet().iterator(); iterator.hasNext();) { + Entry entry = (Entry) iterator.next(); + fileHeap.add(new Pair(entry.getKey(), entry.getValue())); } - Set smallestFiles = new HashSet(); + Set smallestFiles = new HashSet(); while (smallestFiles.size() < maxFilesToCompact && fileHeap.size() > 0) { - Pair pair = fileHeap.remove(); + Pair pair = fileHeap.remove(); filesToCompact.remove(pair.getFirst()); smallestFiles.add(pair.getFirst()); } @@ -3493,12 +3408,12 @@ public class Tablet { static class SplitInfo { String dir; - SortedMap datafiles; + SortedMap datafiles; String time; long initFlushID; long initCompactID; - SplitInfo(String d, SortedMap dfv, String time, long initFlushID, long initCompactID) { + SplitInfo(String d, SortedMap dfv, String time, long initFlushID, long initCompactID) { this.dir = d; this.datafiles = dfv; this.time = time; @@ -3532,16 +3447,9 @@ public class Tablet { // this info is used for optimization... it is ok if map files are missing // from the set... can still query and insert into the tablet while this // map file operation is happening - Map firstAndLastRowsAbs = FileUtil.tryToGetFirstAndLastRows(fs, + Map firstAndLastRows = FileUtil.tryToGetFirstAndLastRows(fs, tabletServer.getSystemConfiguration(), datafileManager.getFiles()); - // convert absolute paths to relative paths - Map firstAndLastRows = new HashMap(); - - for (Entry entry : firstAndLastRowsAbs.entrySet()) { - firstAndLastRows.put(datafileManager.abs2rel(new Path(entry.getKey())), entry.getValue()); - } - synchronized (this) { // java needs tuples ... TreeMap newTablets = new TreeMap(); @@ -3573,14 +3481,14 @@ public class Tablet { KeyExtent low = new KeyExtent(extent.getTableId(), midRow, extent.getPrevEndRow()); KeyExtent high = new KeyExtent(extent.getTableId(), extent.getEndRow(), midRow); - String lowDirectory = TabletOperations.createTabletDirectory(fs, location.getParent().toString(), midRow); + String lowDirectory = TabletOperations.createTabletDirectory(fs, extent.getTableId().toString(), midRow); // write new tablet information to MetadataTable - SortedMap lowDatafileSizes = new TreeMap(); - SortedMap highDatafileSizes = new TreeMap(); - List highDatafilesToRemove = new ArrayList(); + SortedMap lowDatafileSizes = new TreeMap(); + SortedMap highDatafileSizes = new TreeMap(); + List highDatafilesToRemove = new ArrayList(); - MetadataTable.splitDatafiles(extent.getTableId(), midRow, splitRatio, firstAndLastRows, datafileManager.getDatafileSizesRel(), lowDatafileSizes, + MetadataTable.splitDatafiles(extent.getTableId(), midRow, splitRatio, firstAndLastRows, datafileManager.getDatafileSizes(), lowDatafileSizes, highDatafileSizes, highDatafilesToRemove); log.debug("Files for low split " + low + " " + lowDatafileSizes.keySet()); @@ -3591,7 +3499,7 @@ public class Tablet { // it is possible that some of the bulk loading flags will be deleted after being read below because the bulk load // finishes.... therefore split could propogate load flags for a finished bulk load... there is a special iterator // on the !METADATA table to clean up this type of garbage - Map bulkLoadedFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent); + Map bulkLoadedFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent); MetadataTable.splitTablet(high, extent.getPrevEndRow(), splitRatio, SecurityConstants.getSystemCredentials(), tabletServer.getLock()); MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes, bulkLoadedFiles, @@ -3613,7 +3521,7 @@ public class Tablet { } } - public SortedMap getDatafiles() { + public SortedMap getDatafiles() { return datafileManager.getDatafileSizes(); } @@ -3658,10 +3566,10 @@ public class Tablet { return splitCreationTime; } - public void importMapFiles(long tid, Map fileMap, boolean setTime) throws IOException { - Map entries = new HashMap(fileMap.size()); + public void importMapFiles(long tid, Map fileMap, boolean setTime) throws IOException { + Map entries = new HashMap(fileMap.size()); - for (String path : fileMap.keySet()) { + for (FileRef path : fileMap.keySet()) { MapFileInfo mfi = fileMap.get(path); entries.put(path, new DataFileValue(mfi.estimatedSize, 0l)); } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletIteratorEnvironment.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletIteratorEnvironment.java?rev=1489969&r1=1489968&r2=1489969&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletIteratorEnvironment.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletIteratorEnvironment.java Wed Jun 5 17:19:25 2013 @@ -29,7 +29,9 @@ import org.apache.accumulo.core.iterator import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.system.MultiIterator; import org.apache.accumulo.core.util.MetadataTable.DataFileValue; +import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; +import org.apache.hadoop.fs.Path; public class TabletIteratorEnvironment implements IteratorEnvironment { @@ -38,7 +40,7 @@ public class TabletIteratorEnvironment i private final boolean fullMajorCompaction; private final AccumuloConfiguration config; private final ArrayList> topLevelIterators = new ArrayList>(); - private Map files; + private Map files; TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) { if (scope == IteratorScope.majc) @@ -50,7 +52,7 @@ public class TabletIteratorEnvironment i this.fullMajorCompaction = false; } - TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map files) { + TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map files) { if (scope == IteratorScope.majc) throw new IllegalArgumentException("must set if compaction is full"); @@ -90,7 +92,8 @@ public class TabletIteratorEnvironment i @Override public SortedKeyValueIterator reserveMapFileReader(String mapFileName) throws IOException { - return trm.openFiles(Collections.singletonMap(mapFileName, files.get(mapFileName)), false).get(0); + FileRef ref = new FileRef(mapFileName, new Path(mapFileName)); + return trm.openFiles(Collections.singletonMap(ref, files.get(ref)), false).get(0); } @Override Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1489969&r1=1489968&r2=1489969&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Wed Jun 5 17:19:25 2013 @@ -139,6 +139,7 @@ import org.apache.accumulo.server.client import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.data.ServerMutation; +import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.FileSystem; import org.apache.accumulo.server.fs.FileSystemImpl; import org.apache.accumulo.server.master.state.Assignment; @@ -878,6 +879,11 @@ public class TabletServer extends Abstra for (Entry> entry : files.entrySet()) { TKeyExtent tke = entry.getKey(); Map fileMap = entry.getValue(); + Map fileRefMap = new HashMap(); + for (Entry mapping : fileMap.entrySet()) { + org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(mapping.getKey()); + fileRefMap.put(new FileRef(mapping.getKey(), ns.makeQualified(new Path(mapping.getKey()))), mapping.getValue()); + } Tablet importTablet = onlineTablets.get(new KeyExtent(tke)); @@ -885,7 +891,7 @@ public class TabletServer extends Abstra failures.add(tke); } else { try { - importTablet.importMapFiles(tid, fileMap, setTime); + importTablet.importMapFiles(tid, fileRefMap, setTime); } catch (IOException ioe) { log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage()); failures.add(tke); @@ -2913,7 +2919,13 @@ public class TabletServer extends Abstra SortedMap> tabletEntries; tabletEntries = MetadataTable.getTabletEntries(tabletsKeyValues, columnsToFetch); - KeyExtent fke = MetadataTable.fixSplit(metadataEntry, tabletEntries.get(metadataEntry), instance, SecurityConstants.getSystemCredentials(), lock); + KeyExtent fke; + try { + fke = MetadataTable.fixSplit(metadataEntry, tabletEntries.get(metadataEntry), instance, SecurityConstants.getSystemCredentials(), lock); + } catch (IOException e) { + log.error("Error fixing split " + metadataEntry); + throw new AccumuloException(e.toString()); + } if (!fke.equals(extent)) { return new Pair(null, fke); @@ -3243,7 +3255,7 @@ public class TabletServer extends Abstra String recovery = null; for (String log : entry.logSet) { String[] parts = log.split("/"); // "host:port/filename" - log = ServerConstants.getRecoveryDir() + "/" + parts[1]; + log = fs.getFullPath(ServerConstants.getRecoveryDirs(), parts[1]); Path finished = new Path(log + "/finished"); TabletServer.log.info("Looking for " + finished); if (fs.exists(finished)) { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1489969&r1=1489968&r2=1489969&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Wed Jun 5 17:19:25 2013 @@ -48,6 +48,7 @@ import org.apache.accumulo.core.util.Met import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.FileSystem; import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; @@ -455,10 +456,10 @@ public class TabletServerResourceManager } private class MapFileInfo { - private final String path; + private final FileRef path; private final long size; - MapFileInfo(String path, long size) { + MapFileInfo(FileRef path, long size) { this.path = path; this.size = size; } @@ -544,10 +545,10 @@ public class TabletServerResourceManager // BEGIN methods that Tablets call to make decisions about major compaction // when too many files are open, we may want tablets to compact down // to one map file - Map findMapFilesToCompact(SortedMap tabletFiles, MajorCompactionReason reason) { + Map findMapFilesToCompact(SortedMap tabletFiles, MajorCompactionReason reason) { if (reason == MajorCompactionReason.USER) { - Map files = new HashMap(); - for (Entry entry : tabletFiles.entrySet()) { + Map files = new HashMap(); + for (Entry entry : tabletFiles.entrySet()) { files.put(entry.getKey(), entry.getValue().getSize()); } return files; @@ -572,7 +573,7 @@ public class TabletServerResourceManager int maxFilesToCompact = tableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN); int maxFilesPerTablet = tableConf.getMaxFilesPerTablet(); - for (Entry entry : tabletFiles.entrySet()) { + for (Entry entry : tabletFiles.entrySet()) { candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize())); } @@ -581,7 +582,7 @@ public class TabletServerResourceManager totalSize += mfi.size; } - Map files = new HashMap(); + Map files = new HashMap(); while (candidateFiles.size() > 1) { MapFileInfo max = candidateFiles.last(); @@ -607,12 +608,12 @@ public class TabletServerResourceManager if (files.size() < totalFilesToCompact) { - TreeMap tfc = new TreeMap(tabletFiles); + TreeMap tfc = new TreeMap(tabletFiles); tfc.keySet().removeAll(files.keySet()); // put data in candidateFiles to sort it candidateFiles.clear(); - for (Entry entry : tfc.entrySet()) + for (Entry entry : tfc.entrySet()) candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize())); for (MapFileInfo mfi : candidateFiles) { @@ -628,7 +629,7 @@ public class TabletServerResourceManager return files; } - boolean needsMajorCompaction(SortedMap tabletFiles, MajorCompactionReason reason) { + boolean needsMajorCompaction(SortedMap tabletFiles, MajorCompactionReason reason) { if (closed) return false;// throw new IOException("closed"); Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java?rev=1489969&r1=1489968&r2=1489969&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java Wed Jun 5 17:19:25 2013 @@ -113,20 +113,22 @@ public class AddFilesWithMissingEntries final String tableId = ke.getTableId().toString(); final Text row = ke.getMetadataEntry(); log.info(row.toString()); - final Path path = new Path(ServerConstants.getTablesDir() + "/" + tableId + directory); - for (FileStatus file : fs.listStatus(path)) { - if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf")) - continue; - final String filename = directory + "/" + file.getPath().getName(); - if (!knownFiles.contains(filename)) { - count++; - final Mutation m = new Mutation(row); - String size = Long.toString(file.getLen()); - String entries = "1"; // lie - String value = size + "," + entries; - m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(filename), new Value(value.getBytes())); - if (update) { - writer.getBatchWriter(Constants.METADATA_TABLE_NAME).addMutation(m); + for (String dir : ServerConstants.getTablesDirs()) { + final Path path = new Path(dir + "/" + tableId + directory); + for (FileStatus file : fs.listStatus(path)) { + if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf")) + continue; + final String filename = directory + "/" + file.getPath().getName(); + if (!knownFiles.contains(filename)) { + count++; + final Mutation m = new Mutation(row); + String size = Long.toString(file.getLen()); + String entries = "1"; // lie + String value = size + "," + entries; + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(filename), new Value(value.getBytes())); + if (update) { + writer.getBatchWriter(Constants.METADATA_TABLE_NAME).addMutation(m); + } } } } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java?rev=1489969&r1=1489968&r2=1489969&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java Wed Jun 5 17:19:25 2013 @@ -24,7 +24,6 @@ import java.util.Set; import java.util.TreeSet; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.server.cli.ClientOpts; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.impl.Writer; @@ -34,9 +33,10 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.CredentialHelper; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.cli.ClientOpts; import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.accumulo.server.fs.FileSystem; +import org.apache.accumulo.server.fs.FileSystemImpl; import org.apache.hadoop.io.Text; import com.beust.jcommander.Parameter; @@ -192,7 +192,7 @@ public class CheckForMetadataProblems { Opts opts = new Opts(); opts.parseArgs(CheckForMetadataProblems.class.getName(), args); - FileSystem fs = FileSystem.get(CachedConfiguration.getInstance()); + FileSystem fs = FileSystemImpl.get(); checkMetadataTableEntries(opts, fs); opts.stopTracing(); if (sawProblems)