Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 23ECA10AA9 for ; Mon, 5 Jan 2015 20:10:32 +0000 (UTC) Received: (qmail 59574 invoked by uid 500); 5 Jan 2015 20:10:33 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 59472 invoked by uid 500); 5 Jan 2015 20:10:32 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 59438 invoked by uid 99); 5 Jan 2015 20:10:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jan 2015 20:10:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8A155A40563; Mon, 5 Jan 2015 20:10:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Mon, 05 Jan 2015 20:10:32 -0000 Message-Id: <1f428cde305d476aafa87babd766333d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/6] accumulo git commit: ACCUMULO-3462 reduce the opportunity for the queued tablet to get mis-marked Repository: accumulo Updated Branches: refs/heads/master 9022987e0 -> b90b123e1 http://git-wip-us.apache.org/repos/asf/accumulo/blob/07422efc/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java index ddae38a..4a99667 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java @@ -139,20 +139,20 @@ import org.apache.zookeeper.KeeperException.NoNodeException; * We need to be able to have the master tell a tabletServer to * close this file, and the tablet server to handle all pending client reads * before closing - * + * */ /** - * + * * this class just provides an interface to read from a MapFile mostly takes care of reporting start and end keys - * + * * need this because a single row extent can have multiple columns this manages all the columns (each handled by a store) for a single row-extent - * - * + * + * */ public class Tablet { - + enum MajorCompactionReason { // do not change the order, the order of this enum determines the order // in which queued major compactions are executed @@ -161,44 +161,44 @@ public class Tablet { NORMAL, IDLE } - + enum MinorCompactionReason { USER, SYSTEM, CLOSE } public class CommitSession { - + private int seq; private InMemoryMap memTable; private int commitsInProgress; private long maxCommittedTime = Long.MIN_VALUE; - + private CommitSession(int seq, InMemoryMap imm) { this.seq = seq; this.memTable = imm; commitsInProgress = 0; } - + public int getWALogSeq() { return seq; } - + private void decrementCommitsInProgress() { if (commitsInProgress < 1) throw new IllegalStateException("commitsInProgress = " + commitsInProgress); - + commitsInProgress--; if (commitsInProgress == 0) Tablet.this.notifyAll(); } - + private void incrementCommitsInProgress() { if (commitsInProgress < 0) throw new IllegalStateException("commitsInProgress = " + commitsInProgress); - + commitsInProgress++; } - + private void waitForCommitsToFinish() { while (commitsInProgress > 0) { try { @@ -208,105 +208,105 @@ public class Tablet { } } } - + public void abortCommit(List value) { Tablet.this.abortCommit(this, value); } - + public void commit(List mutations) { Tablet.this.commit(this, mutations); } - + public Tablet getTablet() { return Tablet.this; } - + public boolean beginUpdatingLogsUsed(ArrayList copy, boolean mincFinish) { return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish); } - + public void finishUpdatingLogsUsed() { Tablet.this.finishUpdatingLogsUsed(); } - + public int getLogId() { return logId; } - + public KeyExtent getExtent() { return extent; } - + private void updateMaxCommittedTime(long time) { maxCommittedTime = Math.max(time, maxCommittedTime); } - + private long getMaxCommittedTime() { if (maxCommittedTime == Long.MIN_VALUE) throw new IllegalStateException("Tried to read max committed time when it was never set"); return maxCommittedTime; } - + } - + private class TabletMemory { private InMemoryMap memTable; private InMemoryMap otherMemTable; private InMemoryMap deletingMemTable; private int nextSeq = 1; private CommitSession commitSession; - + TabletMemory() { memTable = new InMemoryMap(tabletServer.getSystemConfiguration()); commitSession = new CommitSession(nextSeq, memTable); nextSeq += 2; } - + InMemoryMap getMemTable() { return memTable; } - + InMemoryMap getMinCMemTable() { return otherMemTable; } - + CommitSession prepareForMinC() { if (otherMemTable != null) { throw new IllegalStateException(); } - + if (deletingMemTable != null) { throw new IllegalStateException(); } - + otherMemTable = memTable; memTable = new InMemoryMap(tabletServer.getSystemConfiguration()); - + CommitSession oldCommitSession = commitSession; commitSession = new CommitSession(nextSeq, memTable); nextSeq += 2; - + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes()); - + return oldCommitSession; } - + void finishedMinC() { - + if (otherMemTable == null) { throw new IllegalStateException(); } - + if (deletingMemTable != null) { throw new IllegalStateException(); } - + deletingMemTable = otherMemTable; - + otherMemTable = null; Tablet.this.notifyAll(); } - + void finalizeMinC() { try { deletingMemTable.delete(15000); @@ -315,22 +315,22 @@ public class Tablet { if (otherMemTable != null) { throw new IllegalStateException(); } - + if (deletingMemTable == null) { throw new IllegalStateException(); } - + deletingMemTable = null; - + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0); } } } - + boolean memoryReservedForMinC() { return otherMemTable != null || deletingMemTable != null; } - + void waitForMinC() { while (otherMemTable != null || deletingMemTable != null) { try { @@ -340,21 +340,21 @@ public class Tablet { } } } - + void mutate(CommitSession cm, List mutations) { cm.memTable.mutate(mutations); } - + void updateMemoryUsageStats() { long other = 0; if (otherMemTable != null) other = otherMemTable.estimatedSizeInBytes(); else if (deletingMemTable != null) other = deletingMemTable.estimatedSizeInBytes(); - + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other); } - + List getIterators() { List toReturn = new ArrayList(2); toReturn.add(memTable.skvIterator()); @@ -362,50 +362,50 @@ public class Tablet { toReturn.add(otherMemTable.skvIterator()); return toReturn; } - + void returnIterators(List iters) { for (MemoryIterator iter : iters) { iter.close(); } } - + public long getNumEntries() { if (otherMemTable != null) return memTable.getNumEntries() + otherMemTable.getNumEntries(); return memTable.getNumEntries(); } - + CommitSession getCommitSession() { return commitSession; } } - + private TabletMemory tabletMemory; - + private final TabletTime tabletTime; private long persistedTime; private final Object timeLock = new Object(); - + private final Path location; // absolute path of this tablets dir private TServerInstance lastLocation; - + private Configuration conf; private FileSystem fs; - + private TableConfiguration acuTableConf; - + private volatile boolean tableDirChecked = false; - + private AtomicLong dataSourceDeletions = new AtomicLong(0); private Set activeScans = new HashSet(); - + private volatile boolean closing = false; private boolean closed = false; private boolean closeComplete = false; - + private long lastFlushID = -1; private long lastCompactID = -1; - + private static class CompactionWaitInfo { long flushID = -1; long compactionID = -1; @@ -415,7 +415,7 @@ public class Tablet { private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); private KeyExtent extent; - + private TabletResourceManager tabletResources; final private DatafileManager datafileManager; private volatile boolean majorCompactionInProgress = false; @@ -423,127 +423,127 @@ public class Tablet { private Set majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class)); private volatile boolean minorCompactionInProgress = false; private volatile boolean minorCompactionWaitingToStart = false; - + private boolean updatingFlushID = false; - + private AtomicReference constraintChecker = new AtomicReference(); - + private final String tabletDirectory; - + private int writesInProgress = 0; - + private static final Logger log = Logger.getLogger(Tablet.class); public TabletStatsKeeper timer; - + private Rate queryRate = new Rate(0.2); private long queryCount = 0; - + private Rate queryByteRate = new Rate(0.2); private long queryBytes = 0; - + private Rate ingestRate = new Rate(0.2); private long ingestCount = 0; - + private Rate ingestByteRate = new Rate(0.2); private long ingestBytes = 0; - + private byte[] defaultSecurityLabel = new byte[0]; - + private long lastMinorCompactionFinishTime; private long lastMapFileImportTime; - + private volatile long numEntries; private volatile long numEntriesInMemory; - + // a count of the amount of data read by the iterators private AtomicLong scannedCount = new AtomicLong(0); private Rate scannedRate = new Rate(0.2); private ConfigurationObserver configObserver; - + private TabletServer tabletServer; - + private final int logId; // ensure we only have one reader/writer of our bulk file notes at at time public final Object bulkFileImportLock = new Object(); - + public int getLogId() { return logId; } - + public static class TabletClosedException extends RuntimeException { public TabletClosedException(Exception e) { super(e); } - + public TabletClosedException() { super(); } - + private static final long serialVersionUID = 1L; } - + String getNextMapFilename(String prefix) throws IOException { String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent)); checkTabletDir(); return location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension; } - + private void checkTabletDir() throws IOException { if (!tableDirChecked) { checkTabletDir(this.location); tableDirChecked = true; } } - + private void checkTabletDir(Path tabletDir) throws IOException { - + FileStatus[] files = null; try { files = fs.listStatus(tabletDir); } catch (FileNotFoundException ex) { // ignored } - + if (files == null) { if (tabletDir.getName().startsWith("c-")) log.debug("Tablet " + extent + " had no dir, creating " + tabletDir); // its a clone dir... else log.warn("Tablet " + extent + " had no dir, creating " + tabletDir); - + fs.mkdirs(tabletDir); } } - + private static String rel2abs(String relPath, KeyExtent extent) { if (relPath.startsWith("../")) return ServerConstants.getTablesDir() + relPath.substring(2); else return ServerConstants.getTablesDir() + "/" + extent.getTableId() + relPath; } - + class DatafileManager { // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles final private Map datafileSizes = Collections.synchronizedMap(new TreeMap()); - + DatafileManager(SortedMap datafileSizes) { for (Entry datafiles : datafileSizes.entrySet()) this.datafileSizes.put(new Path(rel2abs(datafiles.getKey(), extent)), datafiles.getValue()); } - + Path mergingMinorCompactionFile = null; Set filesToDeleteAfterScan = new HashSet(); Map> scanFileReservations = new HashMap>(); MapCounter fileScanReferenceCounts = new MapCounter(); long nextScanReservationId = 0; boolean reservationsBlocked = false; - + Set majorCompactingFiles = new HashSet(); - + Pair> reserveFilesForScan() { synchronized (Tablet.this) { - + while (reservationsBlocked) { try { Tablet.this.wait(50); @@ -551,34 +551,34 @@ public class Tablet { log.warn(e, e); } } - + Set absFilePaths = new HashSet(datafileSizes.keySet()); - + long rid = nextScanReservationId++; - + scanFileReservations.put(rid, absFilePaths); - + Map ret = new HashMap(); - + for (Path path : absFilePaths) { fileScanReferenceCounts.increment(path, 1); ret.put(path.toString(), datafileSizes.get(path)); } - + return new Pair>(rid, ret); } } - + void returnFilesForScan(Long reservationId) { - + final Set filesToDelete = new HashSet(); - + synchronized (Tablet.this) { Set absFilePaths = scanFileReservations.remove(reservationId); - + if (absFilePaths == null) throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); - + boolean notify = false; for (Path path : absFilePaths) { long refCount = fileScanReferenceCounts.decrement(path, 1); @@ -589,32 +589,32 @@ public class Tablet { } else if (refCount < 0) throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); } - + if (notify) Tablet.this.notifyAll(); } - + if (filesToDelete.size() > 0) { log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete)); MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock()); } } - + private void removeFilesAfterScanRel(Set relPaths) { Set scanFiles = new HashSet(); - + for (String rpath : relPaths) scanFiles.add(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + rpath)); - + removeFilesAfterScan(scanFiles); } - + private void removeFilesAfterScan(Set scanFiles) { if (scanFiles.size() == 0) return; - + Set filesToDelete = new HashSet(); - + synchronized (Tablet.this) { for (Path path : scanFiles) { if (fileScanReferenceCounts.get(path) == 0) @@ -623,26 +623,26 @@ public class Tablet { filesToDeleteAfterScan.add(path); } } - + if (filesToDelete.size() > 0) { log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete)); MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock()); } } - + private TreeSet waitForScansToFinish(Set pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { long startTime = System.currentTimeMillis(); TreeSet inUse = new TreeSet(); - + Span waitForScans = Trace.start("waitForScans"); synchronized (Tablet.this) { if (blockNewScans) { if (reservationsBlocked) throw new IllegalStateException(); - + reservationsBlocked = true; } - + for (Path path : pathsToWaitFor) { while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) { try { @@ -652,43 +652,43 @@ public class Tablet { } } } - + for (Path path : pathsToWaitFor) { if (fileScanReferenceCounts.get(path) > 0) inUse.add(path); } - + if (blockNewScans) { reservationsBlocked = false; Tablet.this.notifyAll(); } - + } waitForScans.stop(); return inUse; } - + public void importMapFiles(long tid, Map pathsString, boolean setTime) throws IOException { - + String bulkDir = null; - + Map paths = new HashMap(); for (Entry entry : pathsString.entrySet()) paths.put(new Path(entry.getKey()), entry.getValue()); - + for (Path tpath : paths.keySet()) { - + if (!tpath.getParent().getParent().equals(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId()))) { throw new IOException("Map file " + tpath + " not in table dir " + ServerConstants.getTablesDir() + "/" + extent.getTableId()); } - + if (bulkDir == null) bulkDir = tpath.getParent().toString(); else if (!bulkDir.equals(tpath.getParent().toString())) throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath); - + } - + if (extent.isRootTablet()) { throw new IllegalArgumentException("Can not import files to root tablet"); } @@ -706,7 +706,7 @@ public class Tablet { for (String file : files) if (paths.keySet().remove(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + file))) log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file); - + if (paths.size() > 0) { long bulkTime = Long.MIN_VALUE; if (setTime) { @@ -722,40 +722,40 @@ public class Tablet { synchronized (timeLock) { if (bulkTime > persistedTime) persistedTime = bulkTime; - + MetadataTable.updateTabletDataFile(tid, extent, abs2rel(paths), tabletTime.getMetadataValue(persistedTime), auths, tabletServer.getLock()); } } } - + synchronized (Tablet.this) { for (Entry tpath : paths.entrySet()) { if (datafileSizes.containsKey(tpath.getKey())) { log.error("Adding file that is already in set " + tpath.getKey()); } datafileSizes.put(tpath.getKey(), tpath.getValue()); - + } - + tabletResources.importedMapFiles(); - + computeNumEntries(); } - + for (Entry entry : paths.entrySet()) { log.log(TLevel.TABLET_HIST, extent + " import " + abs2rel(entry.getKey()) + " " + entry.getValue()); } } - + String reserveMergingMinorCompactionFile() { if (mergingMinorCompactionFile != null) throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile); - + if (extent.isRootTablet()) return null; - + int maxFiles = acuTableConf.getMaxFilesPerTablet(); - + // when a major compaction is running and we are at max files, write out // one extra file... want to avoid the case where major compaction is // compacting everything except for the largest file, and therefore the @@ -764,45 +764,45 @@ public class Tablet { // are canceled if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles) return null; - + if (datafileSizes.size() >= maxFiles) { // find the smallest file - + long min = Long.MAX_VALUE; Path minName = null; - + for (Entry entry : datafileSizes.entrySet()) { if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) { min = entry.getValue().getSize(); minName = entry.getKey(); } } - + if (minName == null) return null; - + mergingMinorCompactionFile = minName; return minName.toString(); } - + return null; } - + void unreserveMergingMinorCompactionFile(Path file) { if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null) || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile))) throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile); - + mergingMinorCompactionFile = null; } - + void bringMinorCompactionOnline(String tmpDatafile, String newDatafile, String absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { bringMinorCompactionOnline(new Path(tmpDatafile), new Path(newDatafile), absMergeFile == null ? null : new Path(absMergeFile), dfv, commitSession, flushId); } - + void bringMinorCompactionOnline(Path tmpDatafile, Path newDatafile, Path absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { - + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); if (extent.isRootTablet()) { try { @@ -813,7 +813,7 @@ public class Tablet { throw new IllegalStateException("Can not bring major compaction online, lock not held", e); } } - + // rename before putting in metadata table, so files in metadata table should // always exist do { @@ -825,7 +825,7 @@ public class Tablet { log.warn("Target map file already exist " + newDatafile); fs.delete(newDatafile, true); } - + rename(fs, tmpDatafile, newDatafile); } break; @@ -834,9 +834,9 @@ public class Tablet { UtilWaitThread.sleep(60 * 1000); } } while (true); - + long t1, t2; - + // the code below always assumes merged files are in use by scans... this must be done // because the in memory list of files is not updated until after the metadata table // therefore the file is available to scans until memory is updated, but want to ensure @@ -849,12 +849,12 @@ public class Tablet { Set filesInUseByScans = Collections.emptySet(); if (absMergeFile != null) filesInUseByScans = Collections.singleton(absMergeFile); - + // very important to write delete entries outside of log lock, because // this !METADATA write does not go up... it goes sideways or to itself if (absMergeFile != null) MetadataTable.addDeleteEntries(extent, Collections.singleton(abs2rel(absMergeFile)), SecurityConstants.getSystemCredentials()); - + Set unusedWalLogs = beginClearingUnusedLogs(); try { // the order of writing to !METADATA and walog is important in the face of machine/process failures @@ -862,25 +862,25 @@ public class Tablet { // data could be lost... the minor compaction start even should be written before the following metadata // write is made TCredentials creds = SecurityConstants.getSystemCredentials(); - + synchronized (timeLock) { if (commitSession.getMaxCommittedTime() > persistedTime) persistedTime = commitSession.getMaxCommittedTime(); - + String time = tabletTime.getMetadataValue(persistedTime); MetadataTable.updateTabletDataFile(extent, abs2rel(newDatafile), abs2rel(absMergeFile), dfv, time, creds, abs2rel(filesInUseByScans), tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId); } - + } finally { finishClearingUnusedLogs(); } - + do { try { // the purpose of making this update use the new commit session, instead of the old one passed in, // is because the new one will reference the logs used by current memory... - + tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2); break; } catch (IOException e) { @@ -888,37 +888,37 @@ public class Tablet { UtilWaitThread.sleep(1 * 1000); } } while (true); - + synchronized (Tablet.this) { lastLocation = null; - + t1 = System.currentTimeMillis(); if (datafileSizes.containsKey(newDatafile)) { log.error("Adding file that is already in set " + newDatafile); } - + if (dfv.getNumEntries() > 0) { datafileSizes.put(newDatafile, dfv); } - + if (absMergeFile != null) { datafileSizes.remove(absMergeFile); } - + unreserveMergingMinorCompactionFile(absMergeFile); - + dataSourceDeletions.incrementAndGet(); tabletMemory.finishedMinC(); - + lastFlushID = flushId; - + computeNumEntries(); t2 = System.currentTimeMillis(); } - + // must do this after list of files in memory is updated above removeFilesAfterScan(filesInUseByScans); - + if (absMergeFile != null) log.log(TLevel.TABLET_HIST, extent + " MinC [" + abs2rel(absMergeFile) + ",memory] -> " + abs2rel(newDatafile)); else @@ -928,95 +928,95 @@ public class Tablet { log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d", acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), dfv.getSize())); } - + } - + private Map abs2rel(Map paths) { TreeMap relMap = new TreeMap(); - + for (Entry entry : paths.entrySet()) relMap.put(abs2rel(entry.getKey()), entry.getValue()); - + return relMap; } - + private Set abs2rel(Set absPaths) { Set relativePaths = new TreeSet(); for (Path absPath : absPaths) relativePaths.add(abs2rel(absPath)); - + return relativePaths; } - + private Set string2path(Set strings) { Set paths = new HashSet(); for (String path : strings) paths.add(new Path(path)); - + return paths; } - + private String abs2rel(Path absPath) { if (absPath == null) return null; - + if (absPath.getParent().getParent().getName().equals(extent.getTableId().toString())) return "/" + absPath.getParent().getName() + "/" + absPath.getName(); else return "../" + absPath.getParent().getParent().getName() + "/" + absPath.getParent().getName() + "/" + absPath.getName(); } - + public void reserveMajorCompactingFiles(Set files) { if (majorCompactingFiles.size() != 0) throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles); - + Set mcf = string2path(files); if (mergingMinorCompactionFile != null && mcf.contains(mergingMinorCompactionFile)) throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile); - + majorCompactingFiles.addAll(mcf); } - + public void clearMajorCompactingFile() { majorCompactingFiles.clear(); } - + void bringMajorCompactionOnline(Set oldDatafiles, String tmpDatafile, String newDatafile, Long compactionId, DataFileValue dfv) throws IOException { bringMajorCompactionOnline(string2path(oldDatafiles), new Path(tmpDatafile), new Path(newDatafile), compactionId, dfv); } - + void bringMajorCompactionOnline(Set oldDatafiles, Path tmpDatafile, Path newDatafile, Long compactionId, DataFileValue dfv) throws IOException { long t1, t2; - + if (!extent.isRootTablet()) { - + if (fs.exists(newDatafile)) { log.error("Target map file already exist " + newDatafile, new Exception()); throw new IllegalStateException("Target map file already exist " + newDatafile); } - + // rename before putting in metadata table, so files in metadata table should // always exist rename(fs, tmpDatafile, newDatafile); - + if (dfv.getNumEntries() == 0) { fs.delete(newDatafile, true); } } - + TServerInstance lastLocation = null; synchronized (Tablet.this) { - + t1 = System.currentTimeMillis(); - + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); - + dataSourceDeletions.incrementAndGet(); - + if (extent.isRootTablet()) { - + waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE); - + try { if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) { throw new IllegalStateException(); @@ -1024,25 +1024,25 @@ public class Tablet { } catch (Exception e) { throw new IllegalStateException("Can not bring major compaction online, lock not held", e); } - + // mark files as ready for deletion, but // do not delete them until we successfully // rename the compacted map file, in case // the system goes down - + String compactName = newDatafile.getName(); - + for (Path path : oldDatafiles) { rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName())); } - + if (fs.exists(newDatafile)) { log.error("Target map file already exist " + newDatafile, new Exception()); throw new IllegalStateException("Target map file already exist " + newDatafile); } - + rename(fs, tmpDatafile, newDatafile); - + // start deleting files, if we do not finish they will be cleaned // up later Trash trash = new Trash(fs, fs.getConf()); @@ -1052,7 +1052,7 @@ public class Tablet { fs.delete(deleteFile, true); } } - + // atomically remove old files and add new file for (Path oldDatafile : oldDatafiles) { if (!datafileSizes.containsKey(oldDatafile)) { @@ -1061,29 +1061,29 @@ public class Tablet { datafileSizes.remove(oldDatafile); majorCompactingFiles.remove(oldDatafile); } - + if (datafileSizes.containsKey(newDatafile)) { log.error("Adding file that is already in set " + newDatafile); } - + if (dfv.getNumEntries() > 0) { datafileSizes.put(newDatafile, dfv); } - + // could be used by a follow on compaction in a multipass compaction majorCompactingFiles.add(newDatafile); - + computeNumEntries(); - + lastLocation = Tablet.this.lastLocation; Tablet.this.lastLocation = null; - + if (compactionId != null) lastCompactID = compactionId; - + t2 = System.currentTimeMillis(); } - + if (!extent.isRootTablet()) { Set filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000); if (filesInUseByScans.size() > 0) @@ -1092,37 +1092,37 @@ public class Tablet { SecurityConstants.getSystemCredentials(), tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock()); removeFilesAfterScan(filesInUseByScans); } - + log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0)); log.log(TLevel.TABLET_HIST, extent + " MajC " + abs2rel(oldDatafiles) + " --> " + abs2rel(newDatafile)); } - + public SortedMap getDatafileSizesRel() { synchronized (Tablet.this) { TreeMap files = new TreeMap(); Set> es = datafileSizes.entrySet(); - + for (Entry entry : es) { files.put(abs2rel(entry.getKey()), entry.getValue()); } - + return Collections.unmodifiableSortedMap(files); } } - + public SortedMap getDatafileSizes() { synchronized (Tablet.this) { TreeMap files = new TreeMap(); Set> es = datafileSizes.entrySet(); - + for (Entry entry : es) { files.put(entry.getKey().toString(), entry.getValue()); } - + return Collections.unmodifiableSortedMap(files); } } - + public Set getFiles() { synchronized (Tablet.this) { HashSet files = new HashSet(); @@ -1132,38 +1132,38 @@ public class Tablet { return Collections.unmodifiableSet(files); } } - + } - + public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap tabletsKeyValues) throws IOException { this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), tabletsKeyValues); splitCreationTime = 0; } - + public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap datafiles, String time, long initFlushID, long initCompactID) throws IOException { this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), datafiles, time, initFlushID, initCompactID); splitCreationTime = System.currentTimeMillis(); } - + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, SortedMap tabletsKeyValues) throws IOException { this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), tabletsKeyValues); } - + static private final List EMPTY = Collections.emptyList(); - + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, SortedMap datafiles, String time, long initFlushID, long initCompactID) throws IOException { this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), EMPTY, datafiles, time, null, new HashSet(), initFlushID, initCompactID); } - + private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap tabletsKeyValues) { SortedMap entries; - + if (extent.isRootTablet()) { return null; } else { @@ -1175,20 +1175,20 @@ public class Tablet { } } } - + // log.debug("extent : "+extent+" entries : "+entries); - + if (entries.size() == 1) return entries.values().iterator().next().toString(); return null; } - + private static SortedMap lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent, SortedMap tabletsKeyValues) throws IOException { Path location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + locText.toString()); - + TreeMap datafiles = new TreeMap(); - + if (extent.isRootTablet()) { // the meta0 tablet // cleanUpFiles() has special handling for delete. files FileStatus[] files = fs.listStatus(location); @@ -1203,49 +1203,49 @@ public class Tablet { datafiles.put(locText.toString() + "/" + filename, dfv); } } else { - + SortedMap datafilesMetadata; - + Text rowName = extent.getMetadataEntry(); - + ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID, Constants.NO_AUTHS); - + // Commented out because when no data file is present, each tablet will scan through metadata table and return nothing // reduced batch size to improve performance // changed here after endKeys were implemented from 10 to 1000 mdScanner.setBatchSize(1000); - + // leave these in, again, now using endKey for safety mdScanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY); - + mdScanner.setRange(new Range(rowName)); - + datafilesMetadata = new TreeMap(); - + for (Entry entry : mdScanner) { - + if (entry.getKey().compareRow(rowName) != 0) { break; } - + datafilesMetadata.put(new Key(entry.getKey()), new Value(entry.getValue())); } - + Iterator> dfmdIter = datafilesMetadata.entrySet().iterator(); - + while (dfmdIter.hasNext()) { Entry entry = dfmdIter.next(); - + datafiles.put(entry.getKey().getColumnQualifier().toString(), new DataFileValue(entry.getValue().get())); } } return datafiles; } - + private static List lookupLogEntries(KeyExtent ke, SortedMap tabletsKeyValues) { List logEntries = new ArrayList(); - + if (ke.isMeta()) { try { logEntries = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials(), ke); @@ -1264,14 +1264,14 @@ public class Tablet { } } } - + log.debug("got " + logEntries + " for logs for " + ke); return logEntries; } - + private static Set lookupScanFiles(KeyExtent extent, SortedMap tabletsKeyValues) { HashSet scanFiles = new HashSet(); - + Text row = extent.getMetadataEntry(); for (Entry entry : tabletsKeyValues.entrySet()) { Key key = entry.getKey(); @@ -1279,10 +1279,10 @@ public class Tablet { scanFiles.add(key.getColumnQualifier().toString()); } } - + return scanFiles; } - + private static long lookupFlushID(KeyExtent extent, SortedMap tabletsKeyValues) { Text row = extent.getMetadataEntry(); for (Entry entry : tabletsKeyValues.entrySet()) { @@ -1290,10 +1290,10 @@ public class Tablet { if (key.getRow().equals(row) && Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) return Long.parseLong(entry.getValue().toString()); } - + return -1; } - + private static long lookupCompactID(KeyExtent extent, SortedMap tabletsKeyValues) { Text row = extent.getMetadataEntry(); for (Entry entry : tabletsKeyValues.entrySet()) { @@ -1301,17 +1301,17 @@ public class Tablet { if (key.getRow().equals(row) && Constants.METADATA_COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) return Long.parseLong(entry.getValue().toString()); } - + return -1; } - + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, FileSystem fs, SortedMap tabletsKeyValues) throws IOException { this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(), location, fs, extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues)); } - + private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap tabletsKeyValues) { for (Entry entry : tabletsKeyValues.entrySet()) { if (entry.getKey().getColumnFamily().compareTo(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY) == 0) { @@ -1320,7 +1320,7 @@ public class Tablet { } return null; } - + /** * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time */ @@ -1332,68 +1332,70 @@ public class Tablet { this.tabletDirectory = location.toString(); this.conf = conf; this.acuTableConf = tabletServer.getTableConfiguration(extent); - + this.fs = fs; this.extent = extent; this.tabletResources = trm; - + this.lastFlushID = initFlushID; this.lastCompactID = initCompactID; - + if (extent.isRootTablet()) { - + long rtime = Long.MIN_VALUE; for (String path : datafiles.keySet()) { String filename = new Path(path).getName(); - + FileSKVIterator reader = FileOperations.getInstance().openReader(this.location + "/" + filename, true, fs, fs.getConf(), tabletServer.getTableConfiguration(extent)); long maxTime = -1; try { - + while (reader.hasTop()) { maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp()); reader.next(); } - + } finally { reader.close(); } - + if (maxTime > rtime) { time = TabletTime.LOGICAL_TIME_ID + "" + maxTime; rtime = maxTime; } } } - + this.tabletServer = tabletServer; this.logId = tabletServer.createLogId(extent); - + this.timer = new TabletStatsKeeper(); - + setupDefaultSecurityLabels(extent); - + tabletMemory = new TabletMemory(); tabletTime = TabletTime.getInstance(time); persistedTime = tabletTime.getTime(); - + acuTableConf.addObserver(configObserver = new ConfigurationObserver() { - + private void reloadConstraints() { constraintChecker.set(new ConstraintChecker(getTableConfiguration())); } - + + @Override public void propertiesChanged() { reloadConstraints(); - + try { setupDefaultSecurityLabels(extent); } catch (Exception e) { log.error("Failed to reload default security labels for extent: " + extent.toString()); } } - + + @Override public void propertyChanged(String prop) { if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())) reloadConstraints(); @@ -1405,19 +1407,20 @@ public class Tablet { log.error("Failed to reload default security labels for extent: " + extent.toString()); } } - + } - + + @Override public void sessionExpired() { log.debug("Session expired, no longer updating per table props..."); } - + }); // Force a load of any per-table properties configObserver.propertiesChanged(); - + tabletResources.setTablet(this, acuTableConf); - + if (!logEntries.isEmpty()) { log.info("Starting Write-Ahead Log recovery for " + this.extent); final long[] count = new long[2]; @@ -1427,8 +1430,9 @@ public class Tablet { Set absPaths = new HashSet(); for (String relPath : datafiles.keySet()) absPaths.add(rel2abs(relPath, extent)); - + tabletServer.recover(this, logEntries, absPaths, new MutationReceiver() { + @Override public void receive(Mutation m) { // LogReader.printMutation(m); Collection muts = m.getUpdates(); @@ -1443,17 +1447,17 @@ public class Tablet { count[0]++; } }); - + if (count[1] != Long.MIN_VALUE) { tabletTime.useMaxTimeFromWALog(count[1]); } commitSession.updateMaxCommittedTime(tabletTime.getTime()); - + if (count[0] == 0) { MetadataTable.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock()); logEntries.clear(); } - + } catch (Throwable t) { if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) { log.warn("Error recovering from log files: ", t); @@ -1469,11 +1473,11 @@ public class Tablet { currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1])); } } - + log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries() + " entries created)"); } - + String contextName = acuTableConf.get(Property.TABLE_CLASSPATH); if (contextName != null && !contextName.equals("")) { // initialize context classloader, instead of possibly waiting for it to initialize for a scan @@ -1484,11 +1488,11 @@ public class Tablet { // do this last after tablet is completely setup because it // could cause major compaction to start datafileManager = new DatafileManager(datafiles); - + computeNumEntries(); - + datafileManager.removeFilesAfterScanRel(scanFiles); - + // look for hints of a failure on the previous tablet server if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) { // look for any temp files hanging around @@ -1500,10 +1504,10 @@ public class Tablet { // look for any temp files hanging around removeOldTemporaryFiles(); } - + log.log(TLevel.TABLET_HIST, extent + " opened "); } - + private void removeOldTemporaryFiles() { // remove any temporary files created by a previous tablet server try { @@ -1532,18 +1536,18 @@ public class Tablet { } } } - + private static Collection cleanUpFiles(FileSystem fs, FileStatus[] files, Path location, boolean deleteTmp) throws IOException { /* * called in constructor and before major compactions */ Collection goodFiles = new ArrayList(files.length); - + for (FileStatus file : files) { - + String path = file.getPath().toString(); String filename = file.getPath().getName(); - + // check for incomplete major compaction, this should only occur // for root tablet if (filename.startsWith("delete+")) { @@ -1555,35 +1559,35 @@ public class Tablet { continue; } // compaction did not finish, so put files back - + // reset path and filename for rest of loop filename = filename.split("\\+", 3)[2]; path = location + "/" + filename; - + rename(fs, file.getPath(), new Path(path)); } - + if (filename.endsWith("_tmp")) { if (deleteTmp) { log.warn("cleaning up old tmp file: " + path); if (!fs.delete(file.getPath(), true)) log.warn("Delete of tmp file: " + file.getPath().toString() + " return false"); - + } continue; } - + if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) { log.error("unknown file in tablet: " + path); continue; } - + goodFiles.add(path); } - + return goodFiles; } - + public static class KVEntry extends KeyValue { private static final long serialVersionUID = 1L; @@ -1594,58 +1598,58 @@ public class Tablet { int numBytes() { return getKey().getSize() + getValue().get().length; } - + int estimateMemoryUsed() { return getKey().getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object } } - + private LookupResult lookup(SortedKeyValueIterator mmfi, List ranges, HashSet columnSet, ArrayList results, long maxResultsSize) throws IOException { - + LookupResult lookupResult = new LookupResult(); - + boolean exceededMemoryUsage = false; boolean tabletClosed = false; - + Set cfset = null; if (columnSet.size() > 0) cfset = LocalityGroupUtil.families(columnSet); - + for (Range range : ranges) { - + if (exceededMemoryUsage || tabletClosed) { lookupResult.unfinishedRanges.add(range); continue; } - + int entriesAdded = 0; - + try { if (cfset != null) mmfi.seek(range, cfset, true); else mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); - + while (mmfi.hasTop()) { Key key = mmfi.getTopKey(); - + KVEntry kve = new KVEntry(key, mmfi.getTopValue()); results.add(kve); entriesAdded++; lookupResult.bytesAdded += kve.estimateMemoryUsed(); lookupResult.dataSize += kve.numBytes(); - + exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize; - + if (exceededMemoryUsage) { addUnfinishedRange(lookupResult, range, key, false); break; } - + mmfi.next(); } - + } catch (TooManyFilesException tmfe) { // treat this as a closed tablet, and let the client retry log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run"); @@ -1671,61 +1675,61 @@ public class Tablet { handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); tabletClosed = true; } - + } - + return lookupResult; } - + private void handleTabletClosedDuringScan(ArrayList results, LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int entriesAdded) { if (exceededMemoryUsage) throw new IllegalStateException("tablet should not exceed memory usage or close, not both"); - + if (entriesAdded > 0) addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey(), false); else lookupResult.unfinishedRanges.add(range); - + lookupResult.closed = true; } - + private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key, boolean inclusiveStartKey) { if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { Range nlur = new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive()); lookupResult.unfinishedRanges.add(nlur); } } - + public static interface KVReceiver { void receive(List matches) throws IOException; } - + class LookupResult { List unfinishedRanges = new ArrayList(); long bytesAdded = 0; long dataSize = 0; boolean closed = false; } - + public LookupResult lookup(List ranges, HashSet columns, Authorizations authorizations, ArrayList results, long maxResultSize, List ssiList, Map> ssio, AtomicBoolean interruptFlag) throws IOException { - + if (ranges.size() == 0) { return new LookupResult(); } - + ranges = Range.mergeOverlapping(ranges); Collections.sort(ranges); - + Range tabletRange = extent.toDataRange(); for (Range range : ranges) { // do a test to see if this range falls within the tablet, if it does not // then clip will throw an exception tabletRange.clip(range); } - + ScanDataSource dataSource = new ScanDataSource(authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag); - + LookupResult result = null; try { @@ -1739,7 +1743,7 @@ public class Tablet { // code in finally block because always want // to return mapfiles, even when exception is thrown dataSource.close(false); - + synchronized (this) { queryCount += results.size(); if (result != null) @@ -1747,74 +1751,74 @@ public class Tablet { } } } - + private Batch nextBatch(SortedKeyValueIterator iter, Range range, int num, HashSet columns) throws IOException { - + // log.info("In nextBatch.."); - + List results = new ArrayList(); Key key = null; - + Value value; long resultSize = 0L; long resultBytes = 0L; - + long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM); - + if (columns.size() == 0) { iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); } else { iter.seek(range, LocalityGroupUtil.families(columns), true); } - + Key continueKey = null; boolean skipContinueKey = false; - + boolean endOfTabletReached = false; while (iter.hasTop()) { - - value = (Value) iter.getTopValue(); + + value = iter.getTopValue(); key = iter.getTopKey(); - + KVEntry kvEntry = new KVEntry(key, value); // copies key and value results.add(kvEntry); resultSize += kvEntry.estimateMemoryUsed(); resultBytes += kvEntry.numBytes(); - + if (resultSize >= maxResultsSize || results.size() >= num) { continueKey = new Key(key); skipContinueKey = true; break; } - + iter.next(); } - + if (iter.hasTop() == false) { endOfTabletReached = true; } - + Batch retBatch = new Batch(); retBatch.numBytes = resultBytes; - + if (!endOfTabletReached) { retBatch.continueKey = continueKey; retBatch.skipContinueKey = skipContinueKey; } else { retBatch.continueKey = null; } - + if (endOfTabletReached && results.size() == 0) retBatch.results = null; else retBatch.results = results; - + return retBatch; } - + /** * Determine if a JVM shutdown is in progress. - * + * */ private boolean shutdownInProgress() { try { @@ -1825,63 +1829,63 @@ public class Tablet { } catch (IllegalStateException ise) { return true; } - + return false; } - + private class Batch { public boolean skipContinueKey; public List results; public Key continueKey; public long numBytes; } - + Scanner createScanner(Range range, int num, HashSet columns, Authorizations authorizations, List ssiList, Map> ssio, boolean isolated, AtomicBoolean interruptFlag) { // do a test to see if this range falls within the tablet, if it does not // then clip will throw an exception extent.toDataRange().clip(range); - + ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated); return new Scanner(range, opts); } - + class ScanBatch { boolean more; List results; - + ScanBatch(List results, boolean more) { this.results = results; this.more = more; } } - + class Scanner { - + private ScanOptions options; private Range range; private SortedKeyValueIterator isolatedIter; private ScanDataSource isolatedDataSource; private boolean sawException = false; private boolean scanClosed = false; - + Scanner(Range range, ScanOptions options) { this.range = range; this.options = options; } - + synchronized ScanBatch read() throws IOException, TabletClosedException { - + if (sawException) throw new IllegalStateException("Tried to use scanner after exception occurred."); - + if (scanClosed) throw new IllegalStateException("Tried to use scanner after it was closed."); - + Batch results = null; - + ScanDataSource dataSource; - + if (options.isolated) { if (isolatedDataSource == null) isolatedDataSource = new ScanDataSource(options); @@ -1889,11 +1893,11 @@ public class Tablet { } else { dataSource = new ScanDataSource(options); } - + try { - + SortedKeyValueIterator iter; - + if (options.isolated) { if (isolatedIter == null) isolatedIter = new SourceSwitchingIterator(dataSource, true); @@ -1903,9 +1907,9 @@ public class Tablet { } else { iter = new SourceSwitchingIterator(dataSource, false); } - + results = nextBatch(iter, range, options.num, options.columnSet); - + if (results.results == null) { range = null; return new ScanBatch(new ArrayList(), false); @@ -1915,7 +1919,7 @@ public class Tablet { range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive()); return new ScanBatch(results.results, true); } - + } catch (IterationInterruptedException iie) { sawException = true; if (isClosed()) @@ -1927,7 +1931,7 @@ public class Tablet { log.debug("IOException while shutdown in progress ", ioe); throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook } - + sawException = true; dataSource.close(true); throw ioe; @@ -1941,7 +1945,7 @@ public class Tablet { dataSource.close(false); else if (dataSource.fileManager != null) dataSource.fileManager.detach(); - + synchronized (Tablet.this) { if (results != null && results.results != null) { long more = results.results.size(); @@ -1951,7 +1955,7 @@ public class Tablet { } } } - + // close and read are synchronized because can not call close on the data source while it is in use // this cloud lead to the case where file iterators that are in use by a thread are returned // to the pool... this would be bad @@ -1964,9 +1968,9 @@ public class Tablet { } } } - + static class ScanOptions { - + // scan options Authorizations authorizations; byte[] defaultLabels; @@ -1976,7 +1980,7 @@ public class Tablet { AtomicBoolean interruptFlag; int num; boolean isolated; - + ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, Map> ssio, AtomicBoolean interruptFlag, boolean isolated) { this.num = num; @@ -1988,11 +1992,11 @@ public class Tablet { this.interruptFlag = interruptFlag; this.isolated = isolated; } - + } - + class ScanDataSource implements DataSource { - + // data source state private ScanFileManager fileManager; private SortedKeyValueIterator iter; @@ -2001,22 +2005,22 @@ public class Tablet { private long fileReservationId; private AtomicBoolean interruptFlag; private StatsIterator statsIterator; - + ScanOptions options; - + ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, Map> ssio, AtomicBoolean interruptFlag) { expectedDeletionCount = dataSourceDeletions.get(); this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false); this.interruptFlag = interruptFlag; } - + ScanDataSource(ScanOptions options) { expectedDeletionCount = dataSourceDeletions.get(); this.options = options; this.interruptFlag = options.interruptFlag; } - + @Override public DataSource getNewDataSource() { if (!isCurrent()) { @@ -2027,122 +2031,122 @@ public class Tablet { datafileManager.returnFilesForScan(fileReservationId); fileReservationId = -1; } - + if (fileManager != null) fileManager.releaseOpenFiles(false); - + expectedDeletionCount = dataSourceDeletions.get(); iter = null; - + return this; } else return this; } - + @Override public boolean isCurrent() { return expectedDeletionCount == dataSourceDeletions.get(); } - + @Override public SortedKeyValueIterator iterator() throws IOException { if (iter == null) iter = createIterator(); return iter; } - + private SortedKeyValueIterator createIterator() throws IOException { - + Map files; - + synchronized (Tablet.this) { - + if (memIters != null) throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory"); - + if (Tablet.this.closed) throw new TabletClosedException(); - + if (interruptFlag.get()) throw new IterationInterruptedException(extent.toString() + " " + interruptFlag.hashCode()); - + // only acquire the file manager when we know the tablet is open if (fileManager == null) { fileManager = tabletResources.newScanFileManager(); activeScans.add(this); } - + if (fileManager.getNumOpenFiles() != 0) throw new IllegalStateException("Tried to create new scan iterator w/o releasing files"); - + // set this before trying to get iterators in case // getIterators() throws an exception expectedDeletionCount = dataSourceDeletions.get(); - + memIters = tabletMemory.getIterators(); Pair> reservation = datafileManager.reserveFilesForScan(); fileReservationId = reservation.getFirst(); files = reservation.getSecond(); } - + Collection mapfiles = fileManager.openFiles(files, options.isolated); - + List> iters = new ArrayList>(mapfiles.size() + memIters.size()); - + iters.addAll(mapfiles); iters.addAll(memIters); - + for (SortedKeyValueIterator skvi : iters) ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag); - + MultiIterator multiIter = new MultiIterator(iters, extent); - + TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager, files); - + statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, scannedCount); - + DeletingIterator delIter = new DeletingIterator(statsIterator, false); - + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); - + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet); - + VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels); - + return iterEnv.getTopLevelIterator(IteratorUtil .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv)); } - + private void close(boolean sawErrors) { - + if (memIters != null) { tabletMemory.returnIterators(memIters); memIters = null; datafileManager.returnFilesForScan(fileReservationId); fileReservationId = -1; } - + synchronized (Tablet.this) { activeScans.remove(this); if (activeScans.size() == 0) Tablet.this.notifyAll(); } - + if (fileManager != null) { fileManager.releaseOpenFiles(sawErrors); fileManager = null; } - + if (statsIterator != null) { statsIterator.report(); } } - + public void interrupt() { interruptFlag.set(true); } - + @Override public DataSource getDeepCopyDataSource(IteratorEnvironment env) { throw new UnsupportedOperationException(); @@ -2152,28 +2156,28 @@ public class Tablet { public void setInterruptFlag(AtomicBoolean flag) { throw new UnsupportedOperationException(); } - + } - + private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable, String tmpDatafile, String newDatafile, String mergeFile, boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { boolean failed = false; long start = System.currentTimeMillis(); timer.incrementStatusMinor(); - + long count = 0; - + try { Span span = Trace.start("write"); count = memTable.getNumEntries(); - + DataFileValue dfv = null; if (mergeFile != null) dfv = datafileManager.getDatafileSizes().get(mergeFile); - + MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason); CompactionStats stats = compactor.call(); - + span.stop(); span = Trace.start("bringOnline"); datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), @@ -2193,7 +2197,7 @@ public class Tablet { } catch (Throwable t) { log.error("Failed to free tablet memory", t); } - + if (!failed) { lastMinorCompactionFinishTime = System.currentTimeMillis(); } @@ -2207,16 +2211,16 @@ public class Tablet { timer.updateTime(Operation.MINOR, start, count, failed); } } - + private class MinorCompactionTask implements Runnable { - + private long queued; private CommitSession commitSession; private DataFileValue stats; private String mergeFile; private long flushId; private MinorCompactionReason mincReason; - + MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { queued = System.currentTimeMillis(); minorCompactionWaitingToStart = true; @@ -2225,7 +2229,8 @@ public class Tablet { this.flushId = flushId; this.mincReason = mincReason; } - + + @Override public void run() { minorCompactionWaitingToStart = false; minorCompactionInProgress = true; @@ -2256,7 +2261,7 @@ public class Tablet { this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), newMapfileLocation + "_tmp", newMapfileLocation, mergeFile, true, queued, commitSession, flushId, mincReason); span.stop(); - + if (needsSplit()) { tabletServer.executeSplit(Tablet.this); } else { @@ -2274,36 +2279,36 @@ public class Tablet { } } } - + private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) { CommitSession oldCommitSession = tabletMemory.prepareForMinC(); otherLogs = currentLogs; currentLogs = new HashSet(); - + String mergeFile = datafileManager.reserveMergingMinorCompactionFile(); - + return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason); - + } - + void flush(long tableFlushID) { boolean updateMetadata = false; boolean initiateMinor = false; - + try { - + synchronized (this) { - + // only want one thing at a time to update flush ID to ensure that metadata table and tablet in memory state are consistent if (updatingFlushID) return; - + if (lastFlushID >= tableFlushID) return; - + if (closing || closed || tabletMemory.memoryReservedForMinC()) return; - + if (tabletMemory.getMemTable().getNumEntries() == 0) { lastFlushID = tableFlushID; updatingFlushID = true; @@ -2311,7 +2316,7 @@ public class Tablet { } else initiateMinor = true; } - + if (updateMetadata) { TCredentials creds = SecurityConstants.getSystemCredentials(); // if multiple threads were allowed to update this outside of a sync block, then it would be @@ -2319,7 +2324,7 @@ public class Tablet { MetadataTable.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock()); } else if (initiateMinor) initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER); - + } finally { if (updateMetadata) { synchronized (this) { @@ -2328,9 +2333,9 @@ public class Tablet { } } } - + } - + boolean initiateMinorCompaction(MinorCompactionReason mincReason) { if (isClosed()) { // don't bother trying to get flush id if closed... could be closed after this check but that is ok... just trying to cut down on uneeded log messages.... @@ -2347,7 +2352,7 @@ public class Tablet { } return initiateMinorCompaction(flushId, mincReason); } - + boolean minorCompactNow(MinorCompactionReason mincReason) { long flushId; try { @@ -2370,22 +2375,22 @@ public class Tablet { tabletResources.executeMinorCompaction(mct); return true; } - + private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) { MinorCompactionTask mct; long t1, t2; - + StringBuilder logMessage = null; - + try { synchronized (this) { t1 = System.currentTimeMillis(); - + if (closing || closed || majorCompactionWaitingToStart || tabletMemory.memoryReservedForMinC() || tabletMemory.getMemTable().getNumEntries() == 0 || updatingFlushID) { - + logMessage = new StringBuilder(); - + logMessage.append(extent.toString()); logMessage.append(" closing " + closing); logMessage.append(" closed " + closed); @@ -2395,7 +2400,7 @@ public class Tablet { if (tabletMemory != null && tabletMemory.getMemTable() != null) logMessage.append(" tabletMemory.getMemTable().getNumEntries() " + tabletMemory.getMemTable().getNumEntries()); logMessage.append(" updatingFlushID " + updatingFlushID); - + return null; } // We're still recovering log entries @@ -2405,7 +2410,7 @@ public class Tablet { logMessage.append(" datafileManager " + datafileManager); return null; } - + mct = prepareForMinC(flushId, mincReason); t2 = System.currentTimeMillis(); } @@ -2414,11 +2419,11 @@ public class Tablet { if (logMessage != null && log.isDebugEnabled()) log.debug(logMessage); } - + log.debug(String.format("MinC initiate lock %.2f secs", (t2 - t1) / 1000.0)); return mct; } - + long getFlushID() throws NoNodeException { try { String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId() @@ -2436,11 +2441,11 @@ public class Tablet { } } } - + long getCompactionCancelID() { String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId() + Constants.ZTABLE_COMPACT_CANCEL_ID; - + try { return Long.parseLong(new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), UTF_8)); } catch (KeeperException e) { @@ -2454,32 +2459,32 @@ public class Tablet { try { String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId() + Constants.ZTABLE_COMPACT_ID; - + String[] tokens = new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), UTF_8).split(","); long compactID = Long.parseLong(tokens[0]); - + CompactionIterators iters = new CompactionIterators(); if (tokens.length > 1) { Hex hex = new Hex(); ByteArrayInputStream bais = new ByteArrayInputStream(hex.decode(tokens[1].split("=")[1].getBytes(UTF_8))); DataInputStream dis = new DataInputStream(bais); - + try { iters.readFields(dis); } catch (IOException e) { throw new RuntimeException(e); } - + KeyExtent ke = new KeyExtent(extent.getTableId(), iters.getEndRow(), iters.getStartRow()); - + if (!ke.overlaps(extent)) { // only use iterators if compaction range overlaps iters = new CompactionIterators(); } } - + return new Pair>(compactID, iters.getIterators()); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -2495,62 +2500,62 @@ public class Tablet { throw new RuntimeException(e); } } - + public synchronized void waitForMinC() { tabletMemory.waitForMinC(); } - + static class TConstraintViolationException extends Exception { private static final long serialVersionUID = 1L; private Violations violations; private List violators; private List nonViolators; private CommitSession commitSession; - + TConstraintViolationException(Violations violations, List violators, List nonViolators, CommitSession commitSession) { this.violations = violations; this.violators = violators; this.nonViolators = nonViolators; this.commitSession = commitSession; } - + Violations getViolations() { return violations; } - + List getViolators() { return violators; } - + List getNonViolators() { return nonViolators; } - + CommitSession getCommitSession() { return commitSession; } } - + private synchronized CommitSession finishPreparingMutations(long time) { if (writesInProgress < 0) { throw new IllegalStateException("waitingForLogs < 0 " + writesInProgress); } - + if (closed || tabletMemory == null) { // log.debug("tablet closed, can't commit"); return null; } - + writesInProgress++; CommitSession commitSession = tabletMemory.getCommitSession(); commitSession.incrementCommitsInProgress(); commitSession.updateMaxCommittedTime(time); return commitSession; } - + public void checkConstraints() { ConstraintChecker cc = constraintChecker.get(); - + if (cc.classLoaderChanged()) { ConstraintChecker ncc = new ConstraintChecker(getTableConfiguration()); constraintChecker.compareAndSet(cc, ncc); @@ -2558,9 +2563,9 @@ public class Tablet { } public CommitSession prepareMutationsForCommit(TservConstraintEnv cenv, List mutations) throws TConstraintViolationException { - + ConstraintChecker cc = constraintChecker.get(); - + List violators = null; Violations violations = new Violations(); cenv.setExtent(extent); @@ -2573,22 +2578,22 @@ public class Tablet { violators.add(mutation); } } - + long time = tabletTime.setUpdateTimes(mutations); - + if (!violations.isEmpty()) { - + HashSet violatorsSet = new HashSet(violators); ArrayList nonViolators = new ArrayList(); - + for (Mutation mutation : mutations) { if (!violatorsSet.contains(mutation)) { nonViolators.add(mutation); } } - + CommitSession commitSession = null; - + if (nonViolators.size() > 0) { // if everything is a violation, then it is expected that // code calling this will not log or commit @@ -2596,66 +2601,66 @@ public class Tablet { if (commitSession == null) return null; } - + throw new TConstraintViolationException(violations, violators, nonViolators, commitSession); } - + return finishPreparingMutations(time); } - + public synchronized void abortCommit(CommitSession commitSession, List value) { if (writesInProgress <= 0) { throw new IllegalStateException("waitingForLogs <= 0 " + writesInProgress); } - + if (closeComplete || tabletMemory == null) { throw new IllegalStateException("aborting commit when tablet is closed"); } - + commitSession.decrementCommitsInProgress(); writesInProgress--; if (writesInProgress == 0) this.notifyAll(); } - + public void commit(CommitSession commitSession, List mutations) { - + int totalCount = 0; long totalBytes = 0; - + // write the mutation to the in memory table for (Mutation mutation : mutations) { totalCount += mutation.size(); totalBytes += mutation.numBytes(); } - + tabletMemory.mutate(commitSession, mutations); - + synchronized (this) { if (writesInProgress < 1) { throw new IllegalStateException("commiting mutations after logging, but not waiting for any log messages"); } - + if (closed && closeComplete) { throw new IllegalStateException("tablet closed with outstanding messages to the logger"); } - + tabletMemory.updateMemoryUsageStats(); - + // decrement here in case an exception is thrown below writesInProgress--; if (writesInProgress == 0) this.notifyAll(); - + commitSession.decrementCommitsInProgress(); - + numEntries += totalCount; numEntriesInMemory += totalCount; ingestCount += totalCount; ingestBytes += totalBytes; } } - + /** * Closes the mapfiles associated with a Tablet. If saveState is true, a minor compaction is performed. */ @@ -2663,17 +2668,17 @@ public class Tablet { initiateClose(saveState, false, false); completeClose(saveState, true); } - + void initiateClose(boolean saveState, boolean queueMinC, boolean disableWrites) { - + if (!saveState && queueMinC) { throw new IllegalArgumentException("Not saving state on close and requesting minor compactions queue does not make sense"); } - + log.debug("initiateClose(saveState=" + saveState + " queueMinC=" + queueMinC + " disableWrites=" + disableWrites + ") " + getExtent()); - + MinorCompactionTask mct = null; - + synchronized (this) { if (closed || closing || closeComplete) { String msg = "Tablet " + getExtent() + " already"; @@ -2685,15 +2690,15 @@ public class Tablet { msg += " closeComplete"; throw new IllegalStateException(msg); } - + // enter the closing state, no splits, minor, or major compactions can start // should cause running major compactions to stop closing = true; this.notifyAll(); - + // determines if inserts and queries can still continue while minor compacting closed = disableWrites; - + // wait for major compactions to finish, setting closing to // true should cause any running major compactions to abort while (majorCompactionInProgress) { @@ -2703,7 +2708,7 @@ public class Tablet { log.error(e.toString()); } } - + while (updatingFlushID) { try { this.wait(50); @@ -2715,50 +2720,50 @@ public class Tablet { if (!saveState || tabletMemory.getMemTable().getNumEntries() == 0) { return; } - + tabletMemory.waitForMinC(); - + try { mct = prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE); } catch (NoNodeException e) { throw new RuntimeException(e); } - + if (queueMinC) { tabletResources.executeMinorCompaction(mct); return; } - + } - + // do minor compaction outside of synch block so that tablet can be read and written to while // compaction runs mct.run(); } - + private boolean closeCompleting = false; - + synchronized void completeClose(boolean saveState, boolean completeClose) throws IOException { - + if (!closing || closeComplete || closeCompleting) { throw new IllegalStateException("closing = " + closing + " closed = " + closed + " closeComplete = " + closeComplete + " closeCompleting = " + closeCompleting); } - + log.debug("completeClose(saveState=" + saveState + " completeClose=" + completeClose + ") " + getExtent()); - + // ensure this method is only called once, also guards against multiple // threads entering the method at the same time closeCompleting = true; closed = true; - + // modify dataSourceDeletions so scans will try to switch data sources and fail because the tablet is closed dataSourceDeletions.incrementAndGet(); - + for (ScanDataSource activeScan : activeScans) { activeScan.interrupt(); } - + // wait for reads and writes to complete while (writesInProgress > 0 || activeScans.size() > 0) { try { @@ -2767,9 +2772,9 @@ public class Tablet { log.error(e.toString()); } } - + tabletMemory.waitForMinC(); - + if (saveState && tabletMemory.getMemTable().getNumEntries() > 0) { try { prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE).run(); @@ -2777,7 +2782,7 @@ public class Tablet { throw new RuntimeException(e); } } - + if (saveState) { // at this point all tablet data is flushed, so do a consistency check RuntimeException err = null; @@ -2796,48 +2801,48 @@ public class Tablet { log.error("Tablet closed consistency check has failed for " + this.extent + " giving up and closing"); } } - + try { tabletMemory.getMemTable().delete(0); } catch (Throwable t) { log.error("Failed to delete mem table : " + t.getMessage(), t); } - + tabletMemory = null; - + // close map files tabletResources.close(); - + log.log(TLevel.TABLET_HIST, extent + " closed"); - + acuTableConf.removeObserver(configObserver); - + closeComplete = completeClose; } - + private void closeConsistencyCheck() { - + if (tabletMemory.getMemTable().getNumEntries() != 0) { String msg = "Closed tablet " + extent + " has " + tabletMemory.getMemTable().getNumEntries() + " entries in memory"; log.error(msg); throw new RuntimeException(msg); } - + if (tabletMemory.memoryReservedForMinC()) { String msg = "Closed tablet " + extent + " has minor compacting memory"; log.error(msg); throw new RuntimeException(msg); } - + try { Pair,SortedMap> fileLog = MetadataTable.getFileAndLogEntries(SecurityConstants.getSystemCredentials(), extent); - + if (fileLog.getFirst().size() != 0) { String msg = "Closed tablet " + extent + " has walog entries in !METADATA " + fileLog.getFirst(); log.error(msg); throw new RuntimeException(msg); } - + if (extent.isRootTablet()) { if (!fileLog.getSecond().keySet().equals(datafileManager.getDatafileSizesRel().keySet())) { String msg = "Data file in !METADATA differ from in memory data " + extent + " " + fileLog.getSecond().keySet() + " " @@ -2853,59 +2858,60 @@ public class Tablet { throw new RuntimeException(msg); } } - + } catch (Exception e) { String msg = "Failed to do close consistency check for tablet " + extent; log.error(msg, e); throw new RuntimeException(msg, e); - + } - + if (otherLogs.size() != 0 || currentLogs.size() != 0) { String msg = "Closed tablet " + extent + " has walog entries in memory currentLogs = " + currentLogs + " otherLogs = " + otherLogs; log.error(msg); throw new RuntimeException(msg); } - + // TODO check lastFlushID and lostCompactID - ACCUMULO-1290 } - + /** * Returns a Path object representing the tablet's location on the DFS. - * + * * @return location */ public Path getLocation() { return location; } - + private class CompactionRunner implements Runnable, Comparable { - + long queued; long start; boolean failed = false; private MajorCompactionReason reason; - + public CompactionRunner(MajorCompactionReason reason) { queued = System.currentTimeMillis(); this.reason = reason; } - + + @Override public void run() { CompactionStats majCStats = null; - + if (tabletServer.isMajorCompactionDisabled()) { // this will make compaction task that were queued when shutdown was // initiated exit majorCompactionQueued.remove(reason); return; } - + try { timer.incrementStatusMajor(); start = System.currentTimeMillis(); majCStats = majorCompact(reason); - + // if there is more work to be done, queue another major compaction synchronized (Tablet.this) { if (reason == MajorCompactionReason.NORMAL && needsMajorCompaction(reason)) @@ -2919,12 +2925,12 @@ public class Tablet { if (majCStats != null) { count = majCStats.getEntriesRead(); } - + timer.updateTime(Operation.MAJOR, queued, start, count, failed); } } - - // We used to synchronize on the Tablet before fetching this information, + + // We used to synchronize on the Tablet before fetching this information, // but this method is called by the compaction queue thread to re-order the compactions. // The compaction queue holds a lock during this sort. // A tablet lock can be held while putting itself on the queue, so we can't lock the tablet @@ -2933,40 +2939,40 @@ public class Tablet { private int getNumFiles() { return datafileManager.datafileSizes.size(); } - + @Override public int compareTo(CompactionRunner o) { int cmp = reason.compareTo(o.reason); if (cmp != 0) return cmp; - + if (reason == MajorCompactionReason.USER || reason == MajorCompactionReason.CHOP) { // for these types of compactions want to do the oldest first cmp = (int) (queued - o.queued); if (cmp != 0) r