hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1148169 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Date Tue, 19 Jul 2011 05:11:35 GMT
Author: tedyu
Date: Tue Jul 19 05:11:34 2011
New Revision: 1148169

URL: http://svn.apache.org/viewvc?rev=1148169&view=rev
Log:
HBASE-4095  Hlog may not be rolled in a long time if checkLowReplication's
               request of LogRoll is blocked (Jieshan via Ted Yu)

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1148169&r1=1148168&r2=1148169&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Jul 19 05:11:34 2011
@@ -164,6 +164,8 @@ Release 0.91.0 - Unreleased
    HBASE-4052  Enabling a table after master switch does not allow table scan,
                throwing NotServingRegionException (ramkrishna via Ted Yu)
    HBASE-4112  Creating table may throw NullPointerException (Jinchao via Ted Yu)
+   HBASE-4095  Hlog may not be rolled in a long time if checkLowReplication's
+               request of LogRoll is blocked (Jieshan via Ted Yu)
 
   IMPROVEMENTS
    HBASE-3290  Max Compaction Size (Nicolas Spiegelberg via Stack)  

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1148169&r1=1148168&r2=1148169&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Jul 19
05:11:34 2011
@@ -39,6 +39,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -195,6 +196,10 @@ public class HLog implements Syncable {
   // synchronized is insufficient because a cache flush spans two method calls.
   private final Lock cacheFlushLock = new ReentrantLock();
 
+  // The waiting time for log-roller trying to get the lock of cacheFlushLock.
+  // If the actual waiting time is longer than it, skip the current log roll.
+  private final long cacheFlushLockWaitTime;
+  
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
   // locked during appends
@@ -340,6 +345,8 @@ public class HLog implements Syncable {
     this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
+    this.cacheFlushLockWaitTime = 
+        conf.getLong("hbase.regionserver.cacheFlushLock.waittime", 5000);
     if (failIfLogDirExists && fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
@@ -480,66 +487,86 @@ public class HLog implements Syncable {
       return null;
     }
     byte [][] regionsToFlush = null;
-    this.cacheFlushLock.lock();
     try {
-      if (closed) {
-        return regionsToFlush;
-      }
-      // Do all the preparation outside of the updateLock to block
-      // as less as possible the incoming writes
-      long currentFilenum = this.filenum;
-      this.filenum = System.currentTimeMillis();
-      Path newPath = computeFilename();
-      HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
-      int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
-      // Can we get at the dfsclient outputstream?  If an instance of
-      // SFLW, it'll have done the necessary reflection to get at the
-      // protected field name.
-      FSDataOutputStream nextHdfsOut = null;
-      if (nextWriter instanceof SequenceFileLogWriter) {
-        nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
-      }
-      // Tell our listeners that a new log was created
-      if (!this.listeners.isEmpty()) {
-        for (WALObserver i : this.listeners) {
-          i.logRolled(newPath);
-        }
-      }
-
-      synchronized (updateLock) {
-        // Clean up current writer.
-        Path oldFile = cleanupCurrentWriter(currentFilenum);
-        this.writer = nextWriter;
-        this.initialReplication = nextInitialReplication;
-        this.hdfs_out = nextHdfsOut;
-
-        LOG.info((oldFile != null?
-            "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
-            this.numEntries.get() +
-            ", filesize=" +
-            this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
-          "New hlog " + FSUtils.getPath(newPath));
-        this.numEntries.set(0);
-        this.logRollRequested = false;
-      }
-      // Can we delete any of the old log files?
-      if (this.outputfiles.size() > 0) {
-        if (this.lastSeqWritten.isEmpty()) {
-          LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
-          // If so, then no new writes have come in since all regions were
-          // flushed (and removed from the lastSeqWritten map). Means can
-          // remove all but currently open log file.
-          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
-            archiveLogFile(e.getValue(), e.getKey());
+      if (this.cacheFlushLock.tryLock(this.cacheFlushLockWaitTime, 
+        TimeUnit.MILLISECONDS)) {
+        try {
+          if (closed) {
+            return regionsToFlush;
           }
-          this.outputfiles.clear();
-        } else {
-          regionsToFlush = cleanOldLogs();
+          this.logRollRequested = true;
+          // Do all the preparation outside of the updateLock to block
+          // as less as possible the incoming writes
+          long currentFilenum = this.filenum;
+          this.filenum = System.currentTimeMillis();
+          Path newPath = computeFilename();
+          HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+          //This method get expect but not the actual replicas of the Hlog file
+          int nextExpectReplicas = fs.getFileStatus(newPath).getReplication();
+          
+          //Get the current replicas of the Hlog file
+          int nextActualReplicas = -1;
+          try
+          {
+            nextActualReplicas = getLogReplication();
+          } catch (Exception e) {
+            LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
+                " still proceeding ahead...");
+          }
+          // Can we get at the dfsclient outputstream?  If an instance of
+          // SFLW, it'll have done the necessary reflection to get at the
+          // protected field name.
+          FSDataOutputStream nextHdfsOut = null;
+          if (nextWriter instanceof SequenceFileLogWriter) {
+            nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
+          }
+          // Tell our listeners that a new log was created
+          if (!this.listeners.isEmpty()) {
+            for (WALObserver i : this.listeners) {
+              i.logRolled(newPath);
+            }
+          }
+    
+          synchronized (updateLock) {
+            // Clean up current writer.
+            Path oldFile = cleanupCurrentWriter(currentFilenum);
+            this.writer = nextWriter;
+            this.initialReplication = nextActualReplicas == -1 ? 
+              nextExpectReplicas : nextActualReplicas;
+            this.hdfs_out = nextHdfsOut;
+    
+            LOG.info((oldFile != null?
+                "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+                this.numEntries.get() +
+                ", filesize=" +
+                this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+              "New hlog " + FSUtils.getPath(newPath));
+            this.numEntries.set(0);
+            this.logRollRequested = false;
+          }
+          // Can we delete any of the old log files?
+          if (this.outputfiles.size() > 0) {
+            if (this.lastSeqWritten.isEmpty()) {
+              LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
+              // If so, then no new writes have come in since all regions were
+              // flushed (and removed from the lastSeqWritten map). Means can
+              // remove all but currently open log file.
+              for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+                archiveLogFile(e.getValue(), e.getKey());
+              }
+              this.outputfiles.clear();
+            } else {
+              regionsToFlush = cleanOldLogs();
+            }
+          }
+        } finally {
+          this.cacheFlushLock.unlock();
         }
-      }
-    } finally {
-      this.cacheFlushLock.unlock();
-    }
+      } 
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted rollWriter", e);
+      Thread.currentThread().interrupt();
+    }  
     return regionsToFlush;
   }
 
@@ -1004,7 +1031,6 @@ public class HLog implements Syncable {
             this.initialReplication + " replicas. " +
             " Requesting close of hlog.");
         requestLogRoll();
-        logRollRequested = true;
       }
     } catch (Exception e) {
       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +



Mime
View raw message