hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r601383 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/
Date Wed, 05 Dec 2007 16:06:26 GMT
Author: stack
Date: Wed Dec  5 08:06:25 2007
New Revision: 601383

URL: http://svn.apache.org/viewvc?rev=601383&view=rev
Log:
HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=601383&r1=601382&r2=601383&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed Dec  5 08:06:25 2007
@@ -91,6 +91,7 @@
                (Edward Yoon via Stack)
    HADOOP-2299 Support inclusive scans (Bryan Duxbury via Stack)
    HADOOP-2333 Client side retries happen at the wrong level
+   HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks
 
 
 Release 0.15.1

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=601383&r1=601382&r2=601383&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Wed Dec
 5 08:06:25 2007
@@ -151,11 +151,12 @@
     try {
       for (int i = 0; i < logfiles.length; i++) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Splitting " + logfiles[i]);
+          LOG.debug("Splitting " + i + " of " + logfiles.length + ": " +
+            logfiles[i]);
         }
         // Check for empty file.
         if (fs.getFileStatus(logfiles[i]).getLen() <= 0) {
-          LOG.warn("Skipping " + logfiles[i].toString() +
+          LOG.info("Skipping " + logfiles[i].toString() +
             " because zero length");
           continue;
         }
@@ -164,25 +165,28 @@
         try {
           HLogKey key = new HLogKey();
           HLogEdit val = new HLogEdit();
-          while (in.next(key, val)) {
+          int count = 0;
+          for (; in.next(key, val); count++) {
             Text regionName = key.getRegionName();
             SequenceFile.Writer w = logWriters.get(regionName);
             if (w == null) {
               Path logfile = new Path(HRegion.getRegionDir(rootDir,
                 HRegionInfo.encodeRegionName(regionName)),
                 HREGION_OLDLOGFILE_NAME);
-              
               if (LOG.isDebugEnabled()) {
-                LOG.debug("getting new log file writer for path " + logfile);
+                LOG.debug("Creating new log file writer for path " + logfile);
               }
               w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
                 HLogEdit.class);
               logWriters.put(regionName, w);
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Edit " + key.toString() + "=" + val.toString());
+            if (count % 100 == 0 && count > 0 && LOG.isDebugEnabled())
{
+              LOG.debug("Applied " + count + " edits");
             }
             w.append(key, val);
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Applied " + count + " total edits");
           }
         } finally {
           in.close();

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=601383&r1=601382&r2=601383&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Wed
Dec  5 08:06:25 2007
@@ -245,9 +245,9 @@
           String serverName = Writables.bytesToString(results.get(COL_SERVER));
           long startCode = Writables.bytesToLong(results.get(COL_STARTCODE));
           if (LOG.isDebugEnabled()) {
-            LOG.debug(Thread.currentThread().getName() + " scanner: " +
-              Long.valueOf(scannerId) + " regioninfo: {" + info.toString() +
-              "}, server: " + serverName + ", startCode: " + startCode);
+            LOG.debug(Thread.currentThread().getName() + " regioninfo: {" +
+              info.toString() + "}, server: " + serverName + ", startCode: " +
+              startCode);
           }
 
           // Note Region has been assigned.
@@ -447,9 +447,7 @@
         storedInfo = serversToServerInfo.get(serverName);
         deadServer = deadServers.contains(serverName);
       }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Checking " + info.getRegionName() + " is assigned");
-      }
+
       /*
        * If the server is not dead and either:
        *   the stored info is not null and the start code does not match

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=601383&r1=601382&r2=601383&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed
Dec  5 08:06:25 2007
@@ -476,7 +476,9 @@
     return this.conf;
   }
 
-  /** @return region directory Path */
+  /** @return region directory Path
+   * @see HRegion#getRegionDir(Path, String)
+   */
   public Path getRegionDir() {
     return this.regiondir;
   }
@@ -878,11 +880,6 @@
    */
   private boolean internalFlushcache(long startTime) throws IOException {
     if (startTime == -1) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not flushing cache for region " +
-            regionInfo.getRegionName() +
-            ": snapshotMemcaches() determined that there was nothing to do");
-      }
       return false;
     }
 
@@ -1633,13 +1630,17 @@
    * 
    * @param fs the file system object
    * @param baseDirectory base directory for HBase
-   * @param name region file name
+   * @param name region file name ENCODED!
    * @throws IOException
    * @return True if deleted.
+   * @see HRegionInfo#encodeRegionName(Text)
    */
   static boolean deleteRegion(FileSystem fs, Path baseDirectory, String name)
     throws IOException {
     Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), name);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DELETING region " + p.toString());
+    }
     return fs.delete(p);
   }
 
@@ -1647,8 +1648,9 @@
    * Computes the Path of the HRegion
    * 
    * @param dir hbase home directory
-   * @param name region file name
+   * @param name region file name ENCODED!
    * @return Path of HRegion directory
+   * @see HRegionInfo#encodeRegionName(Text)
    */
   public static Path getRegionDir(final Path dir, final String name) {
     return new Path(dir, new Path(HREGIONDIR_PREFIX + name));

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=601383&r1=601382&r2=601383&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed
Dec  5 08:06:25 2007
@@ -542,7 +542,8 @@
       HBaseConfiguration conf) throws IOException {  
     
     this.dir = dir;
-    this.compactionDir = new Path(dir, "compaction.dir");
+    this.compactionDir = new Path(HRegion.getRegionDir(dir, encodedName),
+      "compaction.dir");
     this.regionName = regionName;
     this.encodedRegionName = encodedName;
     this.family = family;
@@ -603,16 +604,7 @@
     // means it was built prior to the previous run of HStore, and so it cannot 
     // contain any updates also contained in the log.
     
-    long maxSeqID = -1;
-    for (HStoreFile hsf: hstoreFiles) {
-      long seqid = hsf.loadInfo(fs);
-      if(seqid > 0) {
-        if(seqid > maxSeqID) {
-          maxSeqID = seqid;
-        }
-      }
-    }
-    this.maxSeqId = maxSeqID;
+    this.maxSeqId = getMaxSequenceId(hstoreFiles);
     if (LOG.isDebugEnabled()) {
       LOG.debug("maximum sequence id for hstore " + storeName + " is " +
           this.maxSeqId);
@@ -641,6 +633,25 @@
     }
   }
   
+  /* 
+   * @param hstoreFiles
+   * @return Maximum sequence number found or -1.
+   * @throws IOException
+   */
+  private long getMaxSequenceId(final List<HStoreFile> hstoreFiles)
+  throws IOException {
+    long maxSeqID = -1;
+    for (HStoreFile hsf : hstoreFiles) {
+      long seqid = hsf.loadInfo(fs);
+      if (seqid > 0) {
+        if (seqid > maxSeqID) {
+          maxSeqID = seqid;
+        }
+      }
+    }
+    return maxSeqID;
+  }
+  
   long getMaxSequenceId() {
     return this.maxSeqId;
   }
@@ -670,16 +681,17 @@
     try {
       HLogKey key = new HLogKey();
       HLogEdit val = new HLogEdit();
+      long skippedEdits = 0;
       while (login.next(key, val)) {
         maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
         if (key.getLogSeqNum() <= maxSeqID) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping edit <" + key.toString() + "=" +
-                val.toString() + "> key sequence: " + key.getLogSeqNum() +
-                " max sequence: " + maxSeqID);
-          }
+          skippedEdits++;
           continue;
         }
+        if (skippedEdits > 0 && LOG.isDebugEnabled()) {
+          LOG.debug("Skipped " + skippedEdits +
+            " edits because sequence id <= " + maxSeqID);
+        }
         // Check this edit is for me. Also, guard against writing
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
         Text column = val.getColumn();
@@ -977,119 +989,88 @@
    * @return true if compaction completed successfully
    */
   boolean compact() throws IOException {
-    long maxId = -1;
     synchronized (compactLock) {
-      Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir,
-          encodedRegionName, familyName);
+      Path curCompactStore = getCompactionDir();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("started compaction of " + storefiles.size() + " files in " +
-          curCompactStore.toString());
+        LOG.debug("started compaction of " + storefiles.size() +
+          " files using " + curCompactStore.toString());
       }
       if (this.fs.exists(curCompactStore)) {
-        LOG.warn("Cleaning up a previous incomplete compaction at " +
-          curCompactStore.toString());
-        if (!this.fs.delete(curCompactStore)) {
-          LOG.warn("Deleted returned false on " + curCompactStore.toString());
+        // Clean out its content in prep. for this new compaction.  Has either
+        // aborted previous compaction or it has content of a previous
+        // compaction.
+        Path [] toRemove = this.fs.listPaths(new Path [] {curCompactStore});
+        for (int i = 0; i < toRemove.length; i++) {
+          this.fs.delete(toRemove[i]);
         }
       }
-      try {
-        // Storefiles are keyed by sequence id.  The oldest file comes first.
-        // We need to return out of here a List that has the newest file as
-        // first.
-        List<HStoreFile> filesToCompact =
-          new ArrayList<HStoreFile>(this.storefiles.values());
-        Collections.reverse(filesToCompact);
-
-        HStoreFile compactedOutputFile = new HStoreFile(conf,
-            this.compactionDir, encodedRegionName, familyName, -1);
-        if (filesToCompact.size() < 1 ||
-            (filesToCompact.size() == 1 &&
-              !filesToCompact.get(0).isReference())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("nothing to compact for " + this.storeName);
-          }
-          return false;
-        }
-        
-        if (!fs.mkdirs(curCompactStore)) {
-          LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
-        }
-        
-        // Compute the max-sequenceID seen in any of the to-be-compacted
-        // TreeMaps if it hasn't been passed in to us.
-        if (maxId == -1) {
-          for (HStoreFile hsf: filesToCompact) {
-            long seqid = hsf.loadInfo(fs);
-            if (seqid > 0) {
-              if (seqid > maxId) {
-                maxId = seqid;
-              }
-            }
-          }
-        }
-
-        // Step through them, writing to the brand-new TreeMap
-        MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
-            this.compression, this.bloomFilter);
-        try {
-          compactHStoreFiles(compactedOut, filesToCompact);
-        } finally {
-          compactedOut.close();
+      // Storefiles are keyed by sequence id. The oldest file comes first.
+      // We need to return out of here a List that has the newest file first.
+      List<HStoreFile> filesToCompact =
+        new ArrayList<HStoreFile>(this.storefiles.values());
+      Collections.reverse(filesToCompact);
+      if (filesToCompact.size() < 1 ||
+        (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("nothing to compact for " + this.storeName);
         }
+        return false;
+      }
 
-        // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
-        if (maxId >= 0) {
-          compactedOutputFile.writeInfo(fs, maxId);
-        } else {
-          compactedOutputFile.writeInfo(fs, -1);
-        }
+      if (!fs.exists(curCompactStore) && !fs.mkdirs(curCompactStore)) {
+        LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
+        return false;
+      }
 
-        // Write out a list of data files that we're replacing
-        Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
-        DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
-        try {
-          out.writeInt(filesToCompact.size());
-          for (HStoreFile hsf: filesToCompact) {
-            hsf.write(out);
-          }
-        } finally {
-          out.close();
-        }
+      // Step through them, writing to the brand-new TreeMap
+      HStoreFile compactedOutputFile = new HStoreFile(conf, this.compactionDir,
+        encodedRegionName, familyName, -1);
+      MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
+        this.compression, this.bloomFilter);
+      try {
+        compactHStoreFiles(compactedOut, filesToCompact);
+      } finally {
+        compactedOut.close();
+      }
 
-        // Indicate that we're done.
-        Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
-        (new DataOutputStream(fs.create(doneFile))).close();
+      // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
+      // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps.
+      long maxId = getMaxSequenceId(filesToCompact);
+      compactedOutputFile.writeInfo(fs, maxId);
 
-        // Move the compaction into place.
-        completeCompaction();
-        return true;
-      } finally {
-        // Clean up the parent -- the region dir in the compactions directory.
-        if (this.fs.exists(curCompactStore.getParent())) {
-          if (!this.fs.delete(curCompactStore.getParent())) {
-            LOG.warn("Delete returned false deleting " +
-              curCompactStore.getParent().toString());
-          }
+      // Write out a list of data files that we're replacing
+      Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
+      DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
+      try {
+        out.writeInt(filesToCompact.size());
+        for (HStoreFile hsf : filesToCompact) {
+          hsf.write(out);
         }
+      } finally {
+        out.close();
       }
+
+      // Indicate that we're done.
+      Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
+      (new DataOutputStream(fs.create(doneFile))).close();
+
+      // Move the compaction into place.
+      completeCompaction(curCompactStore);
+      return true;
     }
   }
   
   /*
-   * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.

-   * We create a new set of MapFile.Reader objects so we don't screw up 
-   * the caching associated with the currently-loaded ones. Our
-   * iteration-based access pattern is practically designed to ruin 
-   * the cache.
-   *
-   * We work by opening a single MapFile.Reader for each file, and 
-   * iterating through them in parallel.  We always increment the 
-   * lowest-ranked one.  Updates to a single row/column will appear 
-   * ranked by timestamp.  This allows us to throw out deleted values or
-   * obsolete versions.
-   * @param compactedOut
-   * @param toCompactFiles
-   * @throws IOException
+   * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
+   * We create a new set of MapFile.Reader objects so we don't screw up the
+   * caching associated with the currently-loaded ones. Our iteration-based
+   * access pattern is practically designed to ruin the cache.
+   * 
+   * We work by opening a single MapFile.Reader for each file, and iterating
+   * through them in parallel. We always increment the lowest-ranked one.
+   * Updates to a single row/column will appear ranked by timestamp. This allows
+   * us to throw out deleted values or obsolete versions. @param compactedOut
+   * @param toCompactFiles @throws IOException
    */
   private void compactHStoreFiles(final MapFile.Writer compactedOut,
       final List<HStoreFile> toCompactFiles) throws IOException {
@@ -1107,6 +1088,7 @@
         // culprit.
         LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() +
           (hsf.isReference()? " " + hsf.getReference().toString(): ""));
+        closeCompactionReaders(rdrs);
         throw e;
       }
     }
@@ -1195,13 +1177,17 @@
         }
       }
     } finally {
-      for (int i = 0; i < rdrs.length; i++) {
-        if (rdrs[i] != null) {
-          try {
-            rdrs[i].close();
-          } catch (IOException e) {
-            LOG.warn("Exception closing reader", e);
-          }
+      closeCompactionReaders(rdrs);
+    }
+  }
+  
+  private void closeCompactionReaders(final CompactionReader [] rdrs) {
+    for (int i = 0; i < rdrs.length; i++) {
+      if (rdrs[i] != null) {
+        try {
+          rdrs[i].close();
+        } catch (IOException e) {
+          LOG.warn("Exception closing reader", e);
         }
       }
     }
@@ -1326,11 +1312,11 @@
    * 8) Releasing the write-lock
    * 9) Allow new scanners to proceed.
    * </pre>
+   * 
+   * @param curCompactStore Compaction to complete.
    */
-  private void completeCompaction() throws IOException {
-    Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir,
-        encodedRegionName, familyName);
-    
+  private void completeCompaction(final Path curCompactStore)
+  throws IOException {
     // 1. Wait for active scanners to exit
     newScannerLock.writeLock().lock();                  // prevent new scanners
     try {
@@ -1346,6 +1332,7 @@
         // 2. Acquiring the HStore write-lock
         this.lock.writeLock().lock();
       }
+
       try {
         Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
         if (!fs.exists(doneFile)) {
@@ -1366,7 +1353,6 @@
             hsf.readFields(in);
             toCompactFiles.add(hsf);
           }
-
         } finally {
           in.close();
         }
@@ -1412,13 +1398,13 @@
           // 7. Loading the new TreeMap.
           Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
           this.readers.put(orderVal,
-              finalCompactedFile.getReader(this.fs, this.bloomFilter));
+            finalCompactedFile.getReader(this.fs, this.bloomFilter));
           this.storefiles.put(orderVal, finalCompactedFile);
         } catch (IOException e) {
           LOG.error("Failed replacing compacted files. Compacted file is " +
-              finalCompactedFile.toString() + ".  Files replaced are " +
-              toCompactFiles.toString() +
-              " some of which may have been already removed", e);
+            finalCompactedFile.toString() + ".  Files replaced are " +
+            toCompactFiles.toString() +
+            " some of which may have been already removed", e);
         }
       } finally {
         // 8. Releasing the write-lock
@@ -1477,6 +1463,17 @@
     } finally {
       this.lock.readLock().unlock();
     }
+  }
+  
+  /*
+   * @return Path to the compaction directory for this column family.
+   * Compaction dir is a subdirectory of the region.  Needs to have the
+   * same regiondir/storefamily path prefix; HStoreFile constructor presumes
+   * it (TODO: Fix).
+   */
+  private Path getCompactionDir() {
+    return HStoreFile.getHStoreDir(this.compactionDir,
+      this.encodedRegionName, this.familyName);
   }
   
   private MapFile.Reader [] getReaders() {



Mime
View raw message