hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1339673 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Date Thu, 17 May 2012 15:58:10 GMT
Author: tedyu
Date: Thu May 17 15:58:09 2012
New Revision: 1339673

URL: http://svn.apache.org/viewvc?rev=1339673&view=rev
Log:
HBASE-5826 Revert, Todd has review comments pending.

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1339673&r1=1339672&r2=1339673&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu May 17
15:58:09 2012
@@ -48,7 +48,6 @@ import java.util.concurrent.locks.Reentr
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.base.Charsets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -140,10 +139,8 @@ public class HLog implements Syncable {
   private final long optionalFlushInterval;
   private final long blocksize;
   private final String prefix;
-
-  /** tracking information about what has been synced */
-  private SyncInfo syncInfo = new SyncInfo();
-
+  private final AtomicLong unflushedEntries = new AtomicLong(0);
+  private volatile long syncedTillHere = 0;
   private long lastDeferredTxid;
   private final Path oldLogDir;
   private volatile boolean logRollRunning;
@@ -233,6 +230,7 @@ public class HLog implements Syncable {
   // during an update
   // locked during appends
   private final Object updateLock = new Object();
+  private final Object flushLock = new Object();
 
   private final boolean enabled;
 
@@ -300,7 +298,6 @@ public class HLog implements Syncable {
   private static Metric writeSize = new Metric();
   // For measuring latency of syncs
   private static Metric syncTime = new Metric();
-  private static AtomicLong syncBatchSize = new AtomicLong();
   //For measuring slow HLog appends
   private static AtomicLong slowHLogAppendCount = new AtomicLong();
   private static Metric slowHLogAppendTime = new Metric();
@@ -317,10 +314,6 @@ public class HLog implements Syncable {
     return syncTime.get();
   }
 
-  public static long getSyncBatchSize() {
-    return syncBatchSize.getAndSet(0);
-  }
-  
   public static long getSlowAppendCount() {
     return slowHLogAppendCount.get();
   }
@@ -840,11 +833,17 @@ public class HLog implements Syncable {
       try {
         // Wait till all current transactions are written to the hlog.
         // No new transactions can occur because we have the updatelock.
-        sync();
+        if (this.unflushedEntries.get() != this.syncedTillHere) {
+          LOG.debug("cleanupCurrentWriter " +
+                   " waiting for transactions to get synced " +
+                   " total " + this.unflushedEntries.get() +
+                   " synced till here " + syncedTillHere);
+          sync();
+        }
         this.writer.close();
         this.writer = null;
         closeErrorCount.set(0);
-      } catch (Exception e) {
+      } catch (IOException e) {
         LOG.error("Failed close of HLog writer", e);
         int errors = closeErrorCount.incrementAndGet();
         if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
@@ -1007,7 +1006,7 @@ public class HLog implements Syncable {
    * @param logEdit
    * @param logKey
    * @param doSync shall we sync after writing the transaction
-   * @return The seqnum of this transaction
+   * @return The txid of this transaction
    * @throws IOException
    */
   public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
@@ -1016,9 +1015,9 @@ public class HLog implements Syncable {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    long seqNum;
+    long txid = 0;
     synchronized (updateLock) {
-      seqNum = obtainSeqNum();
+      long seqNum = obtainSeqNum();
       logKey.setLogSeqNum(seqNum);
       // The 'lastSeqWritten' map holds the sequence number of the oldest
       // write for each region (i.e. the first edit added to the particular
@@ -1028,9 +1027,10 @@ public class HLog implements Syncable {
       this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
         Long.valueOf(seqNum));
       doWrite(regionInfo, logKey, logEdit, htd);
+      txid = this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
       if (htd.isDeferredLogFlush()) {
-        lastDeferredTxid = seqNum;
+        lastDeferredTxid = txid;
       }
     }
 
@@ -1040,9 +1040,9 @@ public class HLog implements Syncable {
         (regionInfo.isMetaRegion() ||
         !htd.isDeferredLogFlush())) {
       // sync txn to file system
-      this.sync(seqNum);
+      this.sync(txid);
     }
-    return seqNum;
+    return txid;
   }
 
   /**
@@ -1090,13 +1090,13 @@ public class HLog implements Syncable {
   private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
       final long now, HTableDescriptor htd, boolean doSync)
     throws IOException {
-      if (edits.isEmpty()) return this.logSeqNum.get();
+      if (edits.isEmpty()) return this.unflushedEntries.get();;
       if (this.closed) {
         throw new IOException("Cannot append; log is closed");
       }
-      long seqNum;
+      long txid = 0;
       synchronized (this.updateLock) {
-        seqNum = obtainSeqNum();
+        long seqNum = obtainSeqNum();
         // The 'lastSeqWritten' map holds the sequence number of the oldest
         // write for each region (i.e. the first edit added to the particular
         // memstore). . When the cache is flushed, the entry for the
@@ -1109,8 +1109,9 @@ public class HLog implements Syncable {
         HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
         doWrite(info, logKey, edits, htd);
         this.numEntries.incrementAndGet();
+        txid = this.unflushedEntries.incrementAndGet();
         if (htd.isDeferredLogFlush()) {
-          lastDeferredTxid = seqNum;
+          lastDeferredTxid = txid;
         }
       }
       // Sync if catalog region, and if not then check if that table supports
@@ -1119,9 +1120,9 @@ public class HLog implements Syncable {
           (info.isMetaRegion() ||
           !htd.isDeferredLogFlush())) {
         // sync txn to file system
-        this.sync(seqNum);
+        this.sync(txid);
       }
-      return seqNum;
+      return txid;
     }
 
   /**
@@ -1179,9 +1180,6 @@ public class HLog implements Syncable {
     // goal is to increase the batchsize for writing-to-hdfs as well as
     // sync-to-hdfs, so that we can get better system throughput.
     private List<Entry> pendingWrites = new LinkedList<Entry>();
-    long lastSeqAppended = -1;
-    long lastSeqFlushed = -1;
-    private Object flushLock = new Object();
 
     LogSyncer(long optionalFlushInterval) {
       this.optionalFlushInterval = optionalFlushInterval;
@@ -1195,7 +1193,7 @@ public class HLog implements Syncable {
         while(!this.isInterrupted() && !closeLogSyncer) {
 
           try {
-            if (syncInfo.getLastSyncedTxId() >= logSeqNum.get()) {
+            if (unflushedEntries.get() <= syncedTillHere) {
               Thread.sleep(this.optionalFlushInterval);
             }
             sync();
@@ -1215,45 +1213,24 @@ public class HLog implements Syncable {
     // our own queue rather than writing it to the HDFS output stream because
     // HDFSOutputStream.writeChunk is not lightweight at all.
     synchronized void append(Entry e) throws IOException {
-      long seq = e.getKey().getLogSeqNum();
-      assert seq > lastSeqAppended;
-      lastSeqAppended = seq;
       pendingWrites.add(e);
     }
 
     // Returns all currently pending writes. New writes
     // will accumulate in a new list.
-    long flushWritesTo(Writer writer) throws IOException {
-      synchronized (flushLock) {
-        List<Entry> pending;
-
-        synchronized (this) {
-          pending = pendingWrites;
-          pendingWrites = new LinkedList<Entry>();
-        }
+    synchronized List<Entry> getPendingWrites() {
+      List<Entry> save = this.pendingWrites;
+      this.pendingWrites = new LinkedList<Entry>();
+      return save;
+    }
 
-        boolean success = false;
-        try {
-          int numFlushed = 0;
-          for (Entry e : pending) {
-            writer.append(e);
-            long seq = e.getKey().getLogSeqNum();
-            assert seq > lastSeqFlushed;
-            lastSeqFlushed = seq;
-            numFlushed++;
-          }
-          syncBatchSize.addAndGet(numFlushed);
-          success = true;
-        } finally {
-          if (!success) {
-            // push back our batch into the pending list
-            synchronized (this) {
-              pending.addAll(pendingWrites);
-              pendingWrites = pending;
-            }
-          }
-        }
-        return lastSeqFlushed;
+    // writes out pending entries to the HLog
+    void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
+      if (pending == null) return;
+
+      // write out all accumulated Entries to hdfs.
+      for (Entry e : pending) {
+        writer.append(e);
       }
     }
     
@@ -1262,31 +1239,9 @@ public class HLog implements Syncable {
     }
   }
 
-  private static class SyncInfo {
-    private long syncedTillHere = 0;
-
-    synchronized long getLastSyncedTxId() {
-      return syncedTillHere;
-    }
-
-    synchronized void notifySynced(long txid) {
-      if (txid > syncedTillHere) {
-        syncedTillHere = txid;
-      }
-      notifyAll();
-    }
-
-    synchronized void waitForSync(long txid) throws InterruptedException {
-      while (syncedTillHere < txid) {
-        wait();
-      }
-    }
-  }
-
-
   // sync all known transactions
   private void syncer() throws IOException {
-    syncer(logSeqNum.get()); // sync all pending
+    syncer(this.unflushedEntries.get()); // sync all pending items
   }
 
   // sync all transactions upto the specified txid
@@ -1298,26 +1253,47 @@ public class HLog implements Syncable {
     }
     // if the transaction that we are interested in is already 
     // synced, then return immediately.
-    if (syncInfo.getLastSyncedTxId() >= txid) {
+    if (txid <= this.syncedTillHere) {
       return;
     }
     try {
+      long doneUpto;
       long now = System.currentTimeMillis();
-      long flushedSeqId;
+      // First flush all the pending writes to HDFS. Then 
+      // issue the sync to HDFS. If sync is successful, then update
+      // syncedTillHere to indicate that transactions till this
+      // number has been successfully synced.
+      synchronized (flushLock) {
+        if (txid <= this.syncedTillHere) {
+          return;
+        }
+        doneUpto = this.unflushedEntries.get();
+        List<Entry> pending = logSyncerThread.getPendingWrites();
+        try {
+          logSyncerThread.hlogFlush(tempWriter, pending);
+        } catch(IOException io) {
+          synchronized (this.updateLock) {
+            // HBASE-4387, HBASE-5623, retry with updateLock held
+            tempWriter = this.writer;
+            logSyncerThread.hlogFlush(tempWriter, pending);
+          }
+        }
+      }
+      // another thread might have sync'ed avoid double-sync'ing
+      if (txid <= this.syncedTillHere) {
+        return;
+      }
       try {
-        flushedSeqId = logSyncerThread.flushWritesTo(tempWriter);
         tempWriter.sync();
       } catch(IOException io) {
         synchronized (this.updateLock) {
           // HBASE-4387, HBASE-5623, retry with updateLock held
           tempWriter = this.writer;
-          flushedSeqId = logSyncerThread.flushWritesTo(tempWriter);
           tempWriter.sync();
         }
       }
-      syncInfo.notifySynced(flushedSeqId);
-      // We try to not acquire the updateLock just to update statistics.
-      // Make these statistics as AtomicLong.
+      this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
+
       syncTime.inc(System.currentTimeMillis() - now);
       if (!this.logRollRunning) {
         checkLowReplication();
@@ -1572,15 +1548,14 @@ public class HLog implements Syncable {
       if (this.closed) {
         return;
       }
-      long seqNumOfCompletionEdit;
+      long txid = 0;
       synchronized (updateLock) {
-        seqNumOfCompletionEdit = obtainSeqNum();
         long now = System.currentTimeMillis();
-
-        WALEdit edit = completeCacheFlushLogEdit(logSeqId);
-        HLogKey key = makeKey(encodedRegionName, tableName, seqNumOfCompletionEdit,
+        WALEdit edit = completeCacheFlushLogEdit();
+        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
             System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
         logSyncerThread.append(new Entry(key, edit));
+        txid = this.unflushedEntries.incrementAndGet();
         writeTime.inc(System.currentTimeMillis() - now);
         long len = 0;
         for (KeyValue kv : edit.getKeyValues()) {
@@ -1590,7 +1565,7 @@ public class HLog implements Syncable {
         this.numEntries.incrementAndGet();
       }
       // sync txn to file system
-      this.sync(seqNumOfCompletionEdit);
+      this.sync(txid);
 
     } finally {
       // updateLock not needed for removing snapshot's entry
@@ -1601,17 +1576,9 @@ public class HLog implements Syncable {
     }
   }
 
-  private WALEdit completeCacheFlushLogEdit(long seqIdOfFlush) {
-    // The data is not actually used here - we just need to write
-    // something to the log to make sure we're still the owner of the
-    // pipeline.
-    byte[] data = Bytes.add(
-      COMPLETE_CACHE_FLUSH,
-      ":".getBytes(Charsets.UTF_8),
-      Bytes.toBytes(String.valueOf(seqIdOfFlush)));
-
+  private WALEdit completeCacheFlushLogEdit() {
     KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
-      System.currentTimeMillis(), data);
+      System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
     WALEdit e = new WALEdit();
     e.add(kv);
     return e;
@@ -1888,7 +1855,7 @@ public class HLog implements Syncable {
 
   /** Provide access to currently deferred sequence num for tests */
   boolean hasDeferredEntries() {
-    return lastDeferredTxid > syncInfo.getLastSyncedTxId();
+    return lastDeferredTxid > syncedTillHere;
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1339673&r1=1339672&r2=1339673&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
Thu May 17 15:58:09 2012
@@ -245,7 +245,7 @@ public class SequenceFileLogWriter imple
 
   @Override
   public void sync() throws IOException {
-    if (this.writer != null) this.writer.syncFs();
+    this.writer.syncFs();
   }
 
   @Override

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1339673&r1=1339672&r2=1339673&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Thu May
17 15:58:09 2012
@@ -548,9 +548,8 @@ public class TestHLog  {
         KeyValue kv = val.getKeyValues().get(0);
         assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
         assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
-        assertTrue(Bytes.startsWith(
-          val.getKeyValues().get(0).getValue(),
-          HLog.COMPLETE_CACHE_FLUSH));
+        assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
+          val.getKeyValues().get(0).getValue()));
         System.out.println(key + " " + val);
       }
     } finally {
@@ -617,9 +616,8 @@ public class TestHLog  {
         assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
         assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
         assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
-        assertTrue(Bytes.startsWith(
-          val.getValue(),
-          HLog.COMPLETE_CACHE_FLUSH));
+        assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
+          val.getValue()));
         System.out.println(entry.getKey() + " " + val);
       }
     } finally {



Mime
View raw message