Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 96536 invoked from network); 5 Dec 2007 16:06:55 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Dec 2007 16:06:55 -0000 Received: (qmail 65863 invoked by uid 500); 5 Dec 2007 16:06:43 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 65825 invoked by uid 500); 5 Dec 2007 16:06:43 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 65816 invoked by uid 99); 5 Dec 2007 16:06:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2007 08:06:43 -0800 X-ASF-Spam-Status: No, hits=-98.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2007 16:06:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id AD62B1A9832; Wed, 5 Dec 2007 08:06:27 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r601383 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ Date: Wed, 05 Dec 2007 16:06:26 -0000 To: hadoop-commits@lucene.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071205160627.AD62B1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Wed Dec 5 08:06:25 2007 New Revision: 601383 URL: http://svn.apache.org/viewvc?rev=601383&view=rev Log: HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=601383&r1=601382&r2=601383&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed Dec 5 08:06:25 2007 @@ -91,6 +91,7 @@ (Edward Yoon via Stack) HADOOP-2299 Support inclusive scans (Bryan Duxbury via Stack) HADOOP-2333 Client side retries happen at the wrong level + HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks Release 0.15.1 Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=601383&r1=601382&r2=601383&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Wed Dec 5 08:06:25 2007 @@ -151,11 +151,12 @@ try { for (int i = 0; i < logfiles.length; i++) { if (LOG.isDebugEnabled()) { - LOG.debug("Splitting " + logfiles[i]); + LOG.debug("Splitting " + i + " of " + logfiles.length + ": " + + logfiles[i]); } // Check for empty file. if (fs.getFileStatus(logfiles[i]).getLen() <= 0) { - LOG.warn("Skipping " + logfiles[i].toString() + + LOG.info("Skipping " + logfiles[i].toString() + " because zero length"); continue; } @@ -164,25 +165,28 @@ try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); - while (in.next(key, val)) { + int count = 0; + for (; in.next(key, val); count++) { Text regionName = key.getRegionName(); SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { Path logfile = new Path(HRegion.getRegionDir(rootDir, HRegionInfo.encodeRegionName(regionName)), HREGION_OLDLOGFILE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("getting new log file writer for path " + logfile); + LOG.debug("Creating new log file writer for path " + logfile); } w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, HLogEdit.class); logWriters.put(regionName, w); } - if (LOG.isDebugEnabled()) { - LOG.debug("Edit " + key.toString() + "=" + val.toString()); + if (count % 100 == 0 && count > 0 && LOG.isDebugEnabled()) { + LOG.debug("Applied " + count + " edits"); } w.append(key, val); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Applied " + count + " total edits"); } } finally { in.close(); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=601383&r1=601382&r2=601383&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Wed Dec 5 08:06:25 2007 @@ -245,9 +245,9 @@ String serverName = Writables.bytesToString(results.get(COL_SERVER)); long startCode = Writables.bytesToLong(results.get(COL_STARTCODE)); if (LOG.isDebugEnabled()) { - LOG.debug(Thread.currentThread().getName() + " scanner: " + - Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + - "}, server: " + serverName + ", startCode: " + startCode); + LOG.debug(Thread.currentThread().getName() + " regioninfo: {" + + info.toString() + "}, server: " + serverName + ", startCode: " + + startCode); } // Note Region has been assigned. @@ -447,9 +447,7 @@ storedInfo = serversToServerInfo.get(serverName); deadServer = deadServers.contains(serverName); } - if (LOG.isDebugEnabled()) { - LOG.debug("Checking " + info.getRegionName() + " is assigned"); - } + /* * If the server is not dead and either: * the stored info is not null and the start code does not match Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=601383&r1=601382&r2=601383&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed Dec 5 08:06:25 2007 @@ -476,7 +476,9 @@ return this.conf; } - /** @return region directory Path */ + /** @return region directory Path + * @see HRegion#getRegionDir(Path, String) + */ public Path getRegionDir() { return this.regiondir; } @@ -878,11 +880,6 @@ */ private boolean internalFlushcache(long startTime) throws IOException { if (startTime == -1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not flushing cache for region " + - regionInfo.getRegionName() + - ": snapshotMemcaches() determined that there was nothing to do"); - } return false; } @@ -1633,13 +1630,17 @@ * * @param fs the file system object * @param baseDirectory base directory for HBase - * @param name region file name + * @param name region file name ENCODED! * @throws IOException * @return True if deleted. + * @see HRegionInfo#encodeRegionName(Text) */ static boolean deleteRegion(FileSystem fs, Path baseDirectory, String name) throws IOException { Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), name); + if (LOG.isDebugEnabled()) { + LOG.debug("DELETING region " + p.toString()); + } return fs.delete(p); } @@ -1647,8 +1648,9 @@ * Computes the Path of the HRegion * * @param dir hbase home directory - * @param name region file name + * @param name region file name ENCODED! * @return Path of HRegion directory + * @see HRegionInfo#encodeRegionName(Text) */ public static Path getRegionDir(final Path dir, final String name) { return new Path(dir, new Path(HREGIONDIR_PREFIX + name)); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=601383&r1=601382&r2=601383&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed Dec 5 08:06:25 2007 @@ -542,7 +542,8 @@ HBaseConfiguration conf) throws IOException { this.dir = dir; - this.compactionDir = new Path(dir, "compaction.dir"); + this.compactionDir = new Path(HRegion.getRegionDir(dir, encodedName), + "compaction.dir"); this.regionName = regionName; this.encodedRegionName = encodedName; this.family = family; @@ -603,16 +604,7 @@ // means it was built prior to the previous run of HStore, and so it cannot // contain any updates also contained in the log. - long maxSeqID = -1; - for (HStoreFile hsf: hstoreFiles) { - long seqid = hsf.loadInfo(fs); - if(seqid > 0) { - if(seqid > maxSeqID) { - maxSeqID = seqid; - } - } - } - this.maxSeqId = maxSeqID; + this.maxSeqId = getMaxSequenceId(hstoreFiles); if (LOG.isDebugEnabled()) { LOG.debug("maximum sequence id for hstore " + storeName + " is " + this.maxSeqId); @@ -641,6 +633,25 @@ } } + /* + * @param hstoreFiles + * @return Maximum sequence number found or -1. + * @throws IOException + */ + private long getMaxSequenceId(final List hstoreFiles) + throws IOException { + long maxSeqID = -1; + for (HStoreFile hsf : hstoreFiles) { + long seqid = hsf.loadInfo(fs); + if (seqid > 0) { + if (seqid > maxSeqID) { + maxSeqID = seqid; + } + } + } + return maxSeqID; + } + long getMaxSequenceId() { return this.maxSeqId; } @@ -670,16 +681,17 @@ try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); + long skippedEdits = 0; while (login.next(key, val)) { maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); if (key.getLogSeqNum() <= maxSeqID) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping edit <" + key.toString() + "=" + - val.toString() + "> key sequence: " + key.getLogSeqNum() + - " max sequence: " + maxSeqID); - } + skippedEdits++; continue; } + if (skippedEdits > 0 && LOG.isDebugEnabled()) { + LOG.debug("Skipped " + skippedEdits + + " edits because sequence id <= " + maxSeqID); + } // Check this edit is for me. Also, guard against writing // METACOLUMN info such as HBASE::CACHEFLUSH entries Text column = val.getColumn(); @@ -977,119 +989,88 @@ * @return true if compaction completed successfully */ boolean compact() throws IOException { - long maxId = -1; synchronized (compactLock) { - Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir, - encodedRegionName, familyName); + Path curCompactStore = getCompactionDir(); if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + storefiles.size() + " files in " + - curCompactStore.toString()); + LOG.debug("started compaction of " + storefiles.size() + + " files using " + curCompactStore.toString()); } if (this.fs.exists(curCompactStore)) { - LOG.warn("Cleaning up a previous incomplete compaction at " + - curCompactStore.toString()); - if (!this.fs.delete(curCompactStore)) { - LOG.warn("Deleted returned false on " + curCompactStore.toString()); + // Clean out its content in prep. for this new compaction. Has either + // aborted previous compaction or it has content of a previous + // compaction. + Path [] toRemove = this.fs.listPaths(new Path [] {curCompactStore}); + for (int i = 0; i < toRemove.length; i++) { + this.fs.delete(toRemove[i]); } } - try { - // Storefiles are keyed by sequence id. The oldest file comes first. - // We need to return out of here a List that has the newest file as - // first. - List filesToCompact = - new ArrayList(this.storefiles.values()); - Collections.reverse(filesToCompact); - - HStoreFile compactedOutputFile = new HStoreFile(conf, - this.compactionDir, encodedRegionName, familyName, -1); - if (filesToCompact.size() < 1 || - (filesToCompact.size() == 1 && - !filesToCompact.get(0).isReference())) { - if (LOG.isDebugEnabled()) { - LOG.debug("nothing to compact for " + this.storeName); - } - return false; - } - - if (!fs.mkdirs(curCompactStore)) { - LOG.warn("Mkdir on " + curCompactStore.toString() + " failed"); - } - - // Compute the max-sequenceID seen in any of the to-be-compacted - // TreeMaps if it hasn't been passed in to us. - if (maxId == -1) { - for (HStoreFile hsf: filesToCompact) { - long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxId) { - maxId = seqid; - } - } - } - } - - // Step through them, writing to the brand-new TreeMap - MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, - this.compression, this.bloomFilter); - try { - compactHStoreFiles(compactedOut, filesToCompact); - } finally { - compactedOut.close(); + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file first. + List filesToCompact = + new ArrayList(this.storefiles.values()); + Collections.reverse(filesToCompact); + if (filesToCompact.size() < 1 || + (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) { + if (LOG.isDebugEnabled()) { + LOG.debug("nothing to compact for " + this.storeName); } + return false; + } - // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - if (maxId >= 0) { - compactedOutputFile.writeInfo(fs, maxId); - } else { - compactedOutputFile.writeInfo(fs, -1); - } + if (!fs.exists(curCompactStore) && !fs.mkdirs(curCompactStore)) { + LOG.warn("Mkdir on " + curCompactStore.toString() + " failed"); + return false; + } - // Write out a list of data files that we're replacing - Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE); - DataOutputStream out = new DataOutputStream(fs.create(filesToReplace)); - try { - out.writeInt(filesToCompact.size()); - for (HStoreFile hsf: filesToCompact) { - hsf.write(out); - } - } finally { - out.close(); - } + // Step through them, writing to the brand-new TreeMap + HStoreFile compactedOutputFile = new HStoreFile(conf, this.compactionDir, + encodedRegionName, familyName, -1); + MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, + this.compression, this.bloomFilter); + try { + compactHStoreFiles(compactedOut, filesToCompact); + } finally { + compactedOut.close(); + } - // Indicate that we're done. - Path doneFile = new Path(curCompactStore, COMPACTION_DONE); - (new DataOutputStream(fs.create(doneFile))).close(); + // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. + // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps. + long maxId = getMaxSequenceId(filesToCompact); + compactedOutputFile.writeInfo(fs, maxId); - // Move the compaction into place. - completeCompaction(); - return true; - } finally { - // Clean up the parent -- the region dir in the compactions directory. - if (this.fs.exists(curCompactStore.getParent())) { - if (!this.fs.delete(curCompactStore.getParent())) { - LOG.warn("Delete returned false deleting " + - curCompactStore.getParent().toString()); - } + // Write out a list of data files that we're replacing + Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE); + DataOutputStream out = new DataOutputStream(fs.create(filesToReplace)); + try { + out.writeInt(filesToCompact.size()); + for (HStoreFile hsf : filesToCompact) { + hsf.write(out); } + } finally { + out.close(); } + + // Indicate that we're done. + Path doneFile = new Path(curCompactStore, COMPACTION_DONE); + (new DataOutputStream(fs.create(doneFile))).close(); + + // Move the compaction into place. + completeCompaction(curCompactStore); + return true; } } /* - * Compact passed toCompactFiles into compactedOut. - * We create a new set of MapFile.Reader objects so we don't screw up - * the caching associated with the currently-loaded ones. Our - * iteration-based access pattern is practically designed to ruin - * the cache. - * - * We work by opening a single MapFile.Reader for each file, and - * iterating through them in parallel. We always increment the - * lowest-ranked one. Updates to a single row/column will appear - * ranked by timestamp. This allows us to throw out deleted values or - * obsolete versions. - * @param compactedOut - * @param toCompactFiles - * @throws IOException + * Compact passed toCompactFiles into compactedOut. + * We create a new set of MapFile.Reader objects so we don't screw up the + * caching associated with the currently-loaded ones. Our iteration-based + * access pattern is practically designed to ruin the cache. + * + * We work by opening a single MapFile.Reader for each file, and iterating + * through them in parallel. We always increment the lowest-ranked one. + * Updates to a single row/column will appear ranked by timestamp. This allows + * us to throw out deleted values or obsolete versions. @param compactedOut + * @param toCompactFiles @throws IOException */ private void compactHStoreFiles(final MapFile.Writer compactedOut, final List toCompactFiles) throws IOException { @@ -1107,6 +1088,7 @@ // culprit. LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() + (hsf.isReference()? " " + hsf.getReference().toString(): "")); + closeCompactionReaders(rdrs); throw e; } } @@ -1195,13 +1177,17 @@ } } } finally { - for (int i = 0; i < rdrs.length; i++) { - if (rdrs[i] != null) { - try { - rdrs[i].close(); - } catch (IOException e) { - LOG.warn("Exception closing reader", e); - } + closeCompactionReaders(rdrs); + } + } + + private void closeCompactionReaders(final CompactionReader [] rdrs) { + for (int i = 0; i < rdrs.length; i++) { + if (rdrs[i] != null) { + try { + rdrs[i].close(); + } catch (IOException e) { + LOG.warn("Exception closing reader", e); } } } @@ -1326,11 +1312,11 @@ * 8) Releasing the write-lock * 9) Allow new scanners to proceed. * + * + * @param curCompactStore Compaction to complete. */ - private void completeCompaction() throws IOException { - Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir, - encodedRegionName, familyName); - + private void completeCompaction(final Path curCompactStore) + throws IOException { // 1. Wait for active scanners to exit newScannerLock.writeLock().lock(); // prevent new scanners try { @@ -1346,6 +1332,7 @@ // 2. Acquiring the HStore write-lock this.lock.writeLock().lock(); } + try { Path doneFile = new Path(curCompactStore, COMPACTION_DONE); if (!fs.exists(doneFile)) { @@ -1366,7 +1353,6 @@ hsf.readFields(in); toCompactFiles.add(hsf); } - } finally { in.close(); } @@ -1412,13 +1398,13 @@ // 7. Loading the new TreeMap. Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); this.readers.put(orderVal, - finalCompactedFile.getReader(this.fs, this.bloomFilter)); + finalCompactedFile.getReader(this.fs, this.bloomFilter)); this.storefiles.put(orderVal, finalCompactedFile); } catch (IOException e) { LOG.error("Failed replacing compacted files. Compacted file is " + - finalCompactedFile.toString() + ". Files replaced are " + - toCompactFiles.toString() + - " some of which may have been already removed", e); + finalCompactedFile.toString() + ". Files replaced are " + + toCompactFiles.toString() + + " some of which may have been already removed", e); } } finally { // 8. Releasing the write-lock @@ -1477,6 +1463,17 @@ } finally { this.lock.readLock().unlock(); } + } + + /* + * @return Path to the compaction directory for this column family. + * Compaction dir is a subdirectory of the region. Needs to have the + * same regiondir/storefamily path prefix; HStoreFile constructor presumes + * it (TODO: Fix). + */ + private Path getCompactionDir() { + return HStoreFile.getHStoreDir(this.compactionDir, + this.encodedRegionName, this.familyName); } private MapFile.Reader [] getReaders() {