hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r892451 - in /hadoop/hbase/trunk: ./ conf/ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/ src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/ src/java/org/apache/had...
Date Sat, 19 Dec 2009 08:10:46 GMT
Author: apurtell
Date: Sat Dec 19 08:10:45 2009
New Revision: 892451

URL: http://svn.apache.org/viewvc?rev=892451&view=rev
Log:
HBASE-2059 Break out WAL reader and writer impl from HLog

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/conf/hbase-default.xml
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
    hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat Dec 19 08:10:45 2009
@@ -242,6 +242,7 @@
    HBASE-2049  Cleanup HLog binary log output (Dave Latham via Stack)
    HBASE-2052  Make hbase more 'live' when comes to noticing table creation,
                splits, etc., for 0.20.3
+   HBASE-2059  Break out WAL reader and writer impl from HLog
 
   NEW FEATURES
    HBASE-1901  "General" partitioner for "hbase-48" bulk (behind the api, write

Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Sat Dec 19 08:10:45 2009
@@ -191,6 +191,16 @@
     <description>Period at which we will roll the commit log.</description>
   </property>
   <property>
+    <name>hbase.regionserver.hlog.reader.impl</name>
+    <value>org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader</value>
+    <description>The HLog file reader implementation.</description>
+  </property>
+  <property>
+    <name>hbase.regionserver.hlog.writer.impl</name>
+    <value>org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter</value>
+    <description>The HLog file writer implementation.</description>
+  </property>
+  <property>
     <name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
     <value>20000</value>
     <description>How often a region server runs the split/compaction check.

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
(original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
Sat Dec 19 08:10:45 2009
@@ -33,7 +33,6 @@
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
-import org.apache.hadoop.io.SequenceFile;
 
 /**
  * Add support for transactional operations to the regionserver's
@@ -48,11 +47,6 @@
   }
 
   @Override
-  protected SequenceFile.Writer createWriter(Path path) throws IOException {
-    return super.createWriter(path, THLogKey.class, KeyValue.class);
-  }
-
-  @Override
   protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqNum,
       long now) {
     return new THLogKey(regionName, tableName, seqNum, now);

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
(original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
Sat Dec 19 08:10:45 2009
@@ -42,7 +42,6 @@
 import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
 import org.apache.hadoop.hbase.client.transactional.TransactionLogger;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -107,12 +106,8 @@
     Set<Long> commitedTransactions = new HashSet<Long>();
     Set<Long> abortedTransactions = new HashSet<Long>();
 
-    SequenceFile.Reader logReader = HLog.getReader(fileSystem,
-        reconstructionLog, conf);
-    
-      try {
-      THLogKey key = new THLogKey();
-      KeyValue val = new KeyValue();
+    HLog.Reader reader = HLog.getReader(fileSystem, reconstructionLog, conf);
+    try {
       long skippedEdits = 0;
       long totalEdits = 0;
       long startCount = 0;
@@ -123,7 +118,10 @@
       int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
           2000);
 
-      while (logReader.next(key, val)) {
+      HLog.Entry entry;
+      while ((entry = reader.next()) != null) {
+        THLogKey key = (THLogKey)entry.getKey();
+        KeyValue val = entry.getEdit();
         if (LOG.isTraceEnabled()) {
           LOG.trace("Processing edit: key: " + key.toString() + " val: "
               + val.toString());
@@ -200,7 +198,7 @@
             + " aborts, and " + commitCount + " commits.");
       }
     } finally {
-      logReader.close();
+      reader.close();
     }
 
     if (pendingTransactionsById.size() > 0) {

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
(original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
Sat Dec 19 08:10:45 2009
@@ -108,6 +108,8 @@
 
   @Override
   protected HLog instantiateHLog(Path logdir) throws IOException {
+    conf.set("hbase.regionserver.hlog.keyclass",
+        THLogKey.class.getCanonicalName());
     HLog newlog = new THLog(super.getFileSystem(), logdir, conf, super.getLogRoller());
     return newlog;
   }

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
(original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
Sat Dec 19 08:10:45 2009
@@ -58,6 +58,8 @@
     // Set the hbase.rootdir to be the home directory in mini dfs.
     this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem()
         .getHomeDirectory().toString());
+    this.conf.set("hbase.regionserver.hlog.keyclass",
+        THLogKey.class.getCanonicalName());
     super.setUp();
     this.dir = new Path("/hbase", getName());
     if (fs.exists(dir)) {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Dec 19
08:10:45 2009
@@ -59,7 +59,6 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -317,17 +316,17 @@
     // general memory usage accounting.
     long maxSeqIdInLog = -1;
     long firstSeqIdInLog = -1;
-    SequenceFile.Reader logReader = HLog.getReader(this.fs, reconstructionLog,
-      this.conf);
+    HLog.Reader logReader = HLog.getReader(this.fs, reconstructionLog, conf);
     try {
-      HLogKey key = HLog.newKey(conf);
-      KeyValue val = new KeyValue();
       long skippedEdits = 0;
       long editsCount = 0;
       // How many edits to apply before we send a progress report.
       int reportInterval =
         this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
-      while (logReader.next(key, val)) {
+      HLog.Entry entry;
+      while ((entry = logReader.next()) != null) {
+        HLogKey key = entry.getKey();
+        KeyValue val = entry.getEdit();
         if (firstSeqIdInLog == -1) {
           firstSeqIdInLog = key.getLogSeqNum();
         }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Sat Dec
19 08:10:45 2009
@@ -23,7 +23,6 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -46,8 +45,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -64,12 +61,6 @@
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Metadata;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progressable;
 
 /**
  * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
@@ -123,7 +114,30 @@
   private final long blocksize;
   private final int flushlogentries;
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
-  private final short replicationLevel;
+
+  public interface Reader {
+
+    void init(FileSystem fs, Path path, Configuration c) throws IOException;
+
+    void close() throws IOException;
+
+    Entry next() throws IOException;
+
+    Entry next(Entry reuse) throws IOException;
+
+  }
+
+  public interface Writer {
+
+    void init(FileSystem fs, Path path, Configuration c) throws IOException;
+
+    void close() throws IOException;
+
+    void sync() throws IOException;
+
+    void append(Entry entry) throws IOException;
+
+  }
 
   // used to indirectly tell syncFs to force the sync
   private boolean forceSync = false;
@@ -131,10 +145,7 @@
   /*
    * Current log file.
    */
-  SequenceFile.Writer writer;
-  // This is the above writer's output stream. Its private but we use reflection
-  // to expose it so we can call sync on it.
-  FSDataOutputStream writer_out;
+  Writer writer;
 
   /*
    * Map of all log files but the current one. 
@@ -218,8 +229,6 @@
       conf.getInt("hbase.regionserver.flushlogentries", 1);
     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
       this.fs.getDefaultBlockSize());
-    this.replicationLevel = (short) conf.getInt("hbase.regionserver.hlog.replication",
-        this.fs.getDefaultReplication());
     // Roll at 95% of block size.
     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
     this.logrollsize = (long)(this.blocksize * multi);
@@ -250,16 +259,6 @@
   }
 
   /**
-   * Get the compression type for the hlog files
-   * @param c Configuration to use.
-   * @return the kind of compression to use
-   */
-  static CompressionType getCompressionType(final Configuration c) {
-    // Compression makes no sense for commit log.  Always return NONE.
-    return CompressionType.NONE;
-  }
-
-  /**
    * Called by HRegionServer when it opens a new region to ensure that log
    * sequence numbers are always greater than the latest sequence number of the
    * region being brought on-line.
@@ -318,7 +317,7 @@
         Path oldFile = cleanupCurrentWriter(this.filenum);
         this.filenum = System.currentTimeMillis();
         Path newPath = computeFilename(this.filenum);
-        this.writer = createWriter(newPath);
+        this.writer = createWriter(fs, newPath, new HBaseConfiguration(conf));
         LOG.info((oldFile != null?
             "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
             this.numEntries.get() +
@@ -349,113 +348,54 @@
     return regionToFlush;
   }
 
-  protected SequenceFile.Writer createWriter(Path path) throws IOException {
-    return createWriter(path, HLogKey.class, KeyValue.class);
-  }
-  
   /**
-   * Hack just to set the correct file length up in SequenceFile.Reader.
-   * See HADOOP-6307.  The below is all about setting the right length on the
-   * file we are reading.  fs.getFileStatus(file).getLen() is passed down to
-   * a private SequenceFile.Reader constructor.  This won't work.  Need to do
-   * the available on the stream.  The below is ugly.  It makes getPos, the
-   * first time its called, return length of the file -- i.e. tell a lie -- just
-   * so this line up in SF.Reader's constructor ends up with right answer:
-   * 
-   *         this.end = in.getPos() + length;
+   * Get a reader for the WAL.
+   * @param fs
+   * @param path
+   * @param keyClass
+   * @param valueClass
+   * @return A WAL reader.  Close when done with it.
+   * @throws IOException
    */
-  private static class WALReader extends SequenceFile.Reader {
-    
-    WALReader(final FileSystem fs, final Path p, final Configuration c)
-    throws IOException {
-      super(fs, p, c);
-      
-    }
-
-    @Override
-    protected FSDataInputStream openFile(FileSystem fs, Path file,
-      int bufferSize, long length)
-    throws IOException {
-      return new WALReaderFSDataInputStream(super.openFile(fs, file, bufferSize,
-        length), length);
-    }
-
-    /**
-     * Override just so can intercept first call to getPos.
-     */
-    static class WALReaderFSDataInputStream extends FSDataInputStream {
-      private boolean firstGetPosInvocation = true;
-      private long length;
-
-      WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
-      throws IOException {
-        super(is);
-        this.length = l;
-      }
-
-      @Override
-      public long getPos() throws IOException {
-        if (this.firstGetPosInvocation) {
-          this.firstGetPosInvocation = false;
-          // Tell a lie.  We're doing this just so that this line up in
-          // SequenceFile.Reader constructor comes out with the correct length
-          // on the file:
-          //         this.end = in.getPos() + length;
-          // 
-          long available = this.in.available();
-          // Length gets added up in the SF.Reader constructor so subtract the
-          // difference.  If available < this.length, then return this.length.
-          // I ain't sure what else to do.
-          return available >= this.length? available - this.length: this.length;
-        }
-        return super.getPos();
-      }
+  @SuppressWarnings("unchecked")
+  public static Reader getReader(final FileSystem fs,
+    final Path path, HBaseConfiguration conf)
+  throws IOException {
+    try {
+      Class c = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl",
+        SequenceFileLogReader.class.getCanonicalName()));
+      HLog.Reader reader = (HLog.Reader) c.newInstance();
+      reader.init(fs, path, conf);
+      return reader;
+    } catch (Exception e) {
+      IOException ie = new IOException("cannot get log reader");
+      ie.initCause(e);
+      throw ie;
     }
   }
 
   /**
-   * Get a Reader for WAL.
-   * Reader is a subclass of SequenceFile.Reader.  The subclass has amendments
-   * to make it so we see edits up to the last sync (HDFS-265).  Of note, we
-   * can only see up to the sync that happened before this file was opened.
-   * Will require us doing up our own WAL Reader if we want to keep up with
-   * a syncing Writer.
-   * @param p
-   * @return A WAL Reader.  Close when done with it.
+   * Get a writer for the WAL.
+   * @param path
+   * @param keyClass
+   * @param valueClass
+   * @return A WAL writer.  Close when done with it.
    * @throws IOException
    */
-  public static SequenceFile.Reader getReader(final FileSystem fs,
-    final Path p, final Configuration c)
-  throws IOException {
-    return new WALReader(fs, p, c);
-  }
-
-  protected SequenceFile.Writer createWriter(Path path,
-    Class<? extends HLogKey> keyClass, Class<? extends KeyValue> valueClass)
-  throws IOException {
-    SequenceFile.Writer writer =
-      SequenceFile.createWriter(this.fs, this.conf, path, keyClass,
-      valueClass, fs.getConf().getInt("io.file.buffer.size", 4096),
-      this.replicationLevel, this.blocksize,
-      SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
-      new Metadata());
-    // Get at the private FSDataOutputStream inside in SequenceFile so we can
-    // call sync on it.  Make it accessible.  Stash it aside for call up in
-    // the sync method above.
-    final Field fields[] = writer.getClass().getDeclaredFields();
-    final String fieldName = "out";
-    for (int i = 0; i < fields.length; ++i) {
-      if (fieldName.equals(fields[i].getName())) {
-        try {
-          fields[i].setAccessible(true);
-          this.writer_out = (FSDataOutputStream)fields[i].get(writer);
-          break;
-        } catch (IllegalAccessException ex) {
-          throw new IOException("Accessing " + fieldName, ex);
-        }
-      }
+  @SuppressWarnings("unchecked")
+  public static Writer createWriter(final FileSystem fs,
+      final Path path, HBaseConfiguration conf) throws IOException {
+    try {
+      Class c = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl",
+        SequenceFileLogWriter.class.getCanonicalName()));
+      HLog.Writer writer = (HLog.Writer) c.newInstance();
+      writer.init(fs, path, conf);
+      return writer;
+    } catch (Exception e) {
+      IOException ie = new IOException("cannot get log writer");
+      ie.initCause(e);
+      throw ie;
     }
-    return writer;
   }
   
   /*
@@ -820,9 +760,6 @@
           this.unflushedEntries.get() >= this.flushlogentries) {
         try {
           this.writer.sync();
-          if (this.writer_out != null) {
-            this.writer_out.sync();
-          }
           this.forceSync = false;
           this.unflushedEntries.set(0);
         } catch (IOException e) {
@@ -857,7 +794,7 @@
         LOG.debug("edit=" + this.numEntries.get() + ", write=" +
           logKey.toString());
       }
-      this.writer.append(logKey, logEdit);
+      this.writer.append(new HLog.Entry(logKey, logEdit));
       long took = System.currentTimeMillis() - now;
       if (took > 1000) {
         LOG.warn(Thread.currentThread().getName() + " took " + took +
@@ -936,8 +873,9 @@
         return;
       }
       synchronized (updateLock) {
-        this.writer.append(makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),

-            completeCacheFlushLogEdit());
+        this.writer.append(new HLog.Entry(
+          makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
+          completeCacheFlushLogEdit()));
         this.numEntries.incrementAndGet();
         Long seq = this.lastSeqWritten.get(regionName);
         if (seq != null && logSeqId >= seq.longValue()) {
@@ -1018,20 +956,20 @@
   // Private immutable datastructure to hold Writer and its Path.
   private final static class WriterAndPath {
     final Path p;
-    final SequenceFile.Writer w;
-    WriterAndPath(final Path p, final SequenceFile.Writer w) {
+    final Writer w;
+    WriterAndPath(final Path p, final Writer w) {
       this.p = p;
       this.w = w;
     }
   }
   
   @SuppressWarnings("unchecked")
-  static Class<? extends HLogKey> getKeyClass(HBaseConfiguration conf) {
+  public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
      return (Class<? extends HLogKey>) 
        conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
   }
   
-  public static HLogKey newKey(HBaseConfiguration conf) throws IOException {
+  public static HLogKey newKey(Configuration conf) throws IOException {
     Class<? extends HLogKey> keyClass = getKeyClass(conf);
     try {
       return keyClass.newInstance();
@@ -1072,8 +1010,8 @@
       int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / 
           concurrentLogReads)).intValue();
       for (int step = 0; step < maxSteps; step++) {
-        final Map<byte[], LinkedList<HLogEntry>> logEntries = 
-          new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
+        final Map<byte[], LinkedList<HLog.Entry>> logEntries = 
+          new TreeMap<byte[], LinkedList<HLog.Entry>>(Bytes.BYTES_COMPARATOR);
         // Stop at logfiles.length when it's the last step
         int endIndex = step == maxSteps - 1? logfiles.length: 
           step * concurrentLogReads + concurrentLogReads;
@@ -1086,28 +1024,22 @@
             LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
               ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
           }
-          SequenceFile.Reader in = null;
+          Reader in = null;
           int count = 0;
           try {
             in = HLog.getReader(fs, logfiles[i].getPath(), conf);
             try {
-              HLogKey key = newKey(conf);
-              KeyValue val = new KeyValue();
-              while (in.next(key, val)) {
-                byte [] regionName = key.getRegionName();
-                LinkedList<HLogEntry> queue = logEntries.get(regionName);
+              HLog.Entry entry;
+              while ((entry = in.next()) != null) {
+                byte [] regionName = entry.getKey().getRegionName();
+                LinkedList<HLog.Entry> queue = logEntries.get(regionName);
                 if (queue == null) {
-                  queue = new LinkedList<HLogEntry>();
+                  queue = new LinkedList<HLog.Entry>();
                   LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName));
                   logEntries.put(regionName, queue);
                 }
-                HLogEntry hle = new HLogEntry(val, key);
-                queue.push(hle);
+                queue.push(entry);
                 count++;
-                // Make the key and value new each time; otherwise same instance
-                // is used over and over.
-                key = newKey(conf);
-                val = new KeyValue();
               }
               LOG.debug("Pushed=" + count + " entries from " +
                 logfiles[i].getPath());
@@ -1148,17 +1080,17 @@
           Thread thread = new Thread(Bytes.toStringBinary(key)) {
             @Override
             public void run() {
-              LinkedList<HLogEntry> entries = logEntries.get(key);
+              LinkedList<HLog.Entry> entries = logEntries.get(key);
               LOG.debug("Thread got " + entries.size() + " to process");
               long threadTime = System.currentTimeMillis();
               try {
                 int count = 0;
                 // Items were added to the linkedlist oldest first. Pull them
                 // out in that order.
-                for (ListIterator<HLogEntry> i =
+                for (ListIterator<HLog.Entry> i =
                   entries.listIterator(entries.size());
                     i.hasPrevious();) {
-                  HLogEntry logEntry = i.previous();
+                  HLog.Entry logEntry = i.previous();
                   WriterAndPath wap = logWriters.get(key);
                   if (wap == null) {
                     Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
@@ -1166,7 +1098,7 @@
                         HRegionInfo.encodeRegionName(key)),
                         HREGION_OLDLOGFILE_NAME);
                     Path oldlogfile = null;
-                    SequenceFile.Reader old = null;
+                    Reader old = null;
                     if (fs.exists(logfile)) {
                       FileStatus stat = fs.getFileStatus(logfile);
                       if (stat.getLen() <= 0) {
@@ -1178,12 +1110,10 @@
                           "exists. Copying existing file to new file");
                         oldlogfile = new Path(logfile.toString() + ".old");
                         fs.rename(logfile, oldlogfile);
-                        old = new SequenceFile.Reader(fs, oldlogfile, conf);
+                        old = getReader(fs, oldlogfile, conf);
                       }
                     }
-                    SequenceFile.Writer w =
-                      SequenceFile.createWriter(fs, conf, logfile,
-                        getKeyClass(conf), KeyValue.class, getCompressionType(conf));
+                    Writer w = createWriter(fs, logfile, conf);
                     wap = new WriterAndPath(logfile, w);
                     logWriters.put(key, wap);
                     if (LOG.isDebugEnabled()) {
@@ -1193,20 +1123,19 @@
 
                     if (old != null) {
                       // Copy from existing log file
-                      HLogKey oldkey = newKey(conf);
-                      KeyValue oldval = new KeyValue();
-                      for (; old.next(oldkey, oldval); count++) {
+                      HLog.Entry entry;
+                      for (; (entry = old.next()) != null; count++) {
                         if (LOG.isDebugEnabled() && count > 0
                             && count % 10000 == 0) {
                           LOG.debug("Copied " + count + " edits");
                         }
-                        w.append(oldkey, oldval);
+                        w.append(entry);
                       }
                       old.close();
                       fs.delete(oldlogfile, true);
                     }
                   }
-                  wap.w.append(logEntry.getKey(), logEntry.getEdit());
+                  wap.w.append(logEntry);
                   count++;
                 }
                 if (LOG.isDebugEnabled()) {
@@ -1249,18 +1178,24 @@
    * Utility class that lets us keep track of the edit with it's key
    * Only used when splitting logs
    */
-  public static class HLogEntry {
+  public static class Entry {
     private KeyValue edit;
     private HLogKey key;
+
+    public Entry() {
+      edit = new KeyValue();
+      key = new HLogKey();
+    }
+
     /**
      * Constructor for both params
      * @param edit log's edit
      * @param key log's key
      */
-    public HLogEntry(KeyValue edit, HLogKey key) {
+    public Entry(HLogKey key, KeyValue edit) {
       super();
-      this.edit = edit;
       this.key = key;
+      this.edit = edit;
     }
     /**
      * Gets the edit
@@ -1360,12 +1295,11 @@
         if (!fs.isFile(logPath)) {
           throw new IOException(args[i] + " is not a file");
         }
-        Reader log = new SequenceFile.Reader(fs, logPath, conf);
+        Reader log = getReader(fs, logPath, conf);
         try {
-          HLogKey key = new HLogKey();
-          KeyValue val = new KeyValue();
-          while (log.next(key, val)) {
-            System.out.println(key.toString() + " " + val.toString());
+          HLog.Entry entry;
+          while ((entry = log.next()) != null) {
+            System.out.println(entry.toString());
           }
         } finally {
           log.close();
@@ -1382,16 +1316,5 @@
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
       ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
-  
-  static class HLogWriter extends SequenceFile.Writer {
-    public HLogWriter(FileSystem arg0, Configuration arg1, Path arg2,
-        Class<?> arg3, Class<?> arg4, int arg5, short arg6, long arg7,
-        Progressable arg8, Metadata arg9) throws IOException {
-      super(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
-    }
-    
-    void flush() {
-      
-    }
-  }
+
 }

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=892451&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
(added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
Sat Dec 19 08:10:45 2009
@@ -0,0 +1,110 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.io.SequenceFile;
+
+public class SequenceFileLogReader implements HLog.Reader {
+  
+  /**
+   * Hack just to set the correct file length up in SequenceFile.Reader.
+   * See HADOOP-6307.  The below is all about setting the right length on the
+   * file we are reading.  fs.getFileStatus(file).getLen() is passed down to
+   * a private SequenceFile.Reader constructor.  This won't work.  Need to do
+   * the available on the stream.  The below is ugly.  It makes getPos, the
+   * first time its called, return length of the file -- i.e. tell a lie -- just
+   * so this line up in SF.Reader's constructor ends up with right answer:
+   * 
+   *         this.end = in.getPos() + length;
+   *
+   */
+  private static class WALReader extends SequenceFile.Reader {
+
+    WALReader(final FileSystem fs, final Path p, final Configuration c)
+    throws IOException {
+      super(fs, p, c);
+      
+    }
+
+    @Override
+    protected FSDataInputStream openFile(FileSystem fs, Path file,
+      int bufferSize, long length)
+    throws IOException {
+      return new WALReaderFSDataInputStream(super.openFile(fs, file, 
+        bufferSize, length), length);
+    }
+
+    /**
+     * Override just so can intercept first call to getPos.
+     */
+    static class WALReaderFSDataInputStream extends FSDataInputStream {
+      private boolean firstGetPosInvocation = true;
+      private long length;
+
+      WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
+      throws IOException {
+        super(is);
+        this.length = l;
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        if (this.firstGetPosInvocation) {
+          this.firstGetPosInvocation = false;
+          // Tell a lie.  We're doing this just so that this line up in
+          // SequenceFile.Reader constructor comes out with the correct length
+          // on the file:
+          //         this.end = in.getPos() + length;
+          long available = this.in.available();
+          // Length gets added up in the SF.Reader constructor so subtract the
+          // difference.  If available < this.length, then return this.length.
+          return available >= this.length? available - this.length: this.length;
+        }
+        return super.getPos();
+      }
+    }
+  }
+
+  Configuration conf;
+  WALReader reader;
+  
+  public SequenceFileLogReader() { }
+
+  @Override
+  public void init(FileSystem fs, Path path, Configuration conf)
+      throws IOException {
+    this.conf = conf;
+    reader = new WALReader(fs, path, conf);
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  @Override
+  public HLog.Entry next() throws IOException {
+    return next(null);
+  }
+
+  @Override
+  public HLog.Entry next(HLog.Entry reuse) throws IOException {
+    if (reuse == null) {
+      HLogKey key = HLog.newKey(conf);
+      KeyValue val = new KeyValue();
+      if (reader.next(key, val)) {
+        return new HLog.Entry(key, val);
+      }
+    } else if (reader.next(reuse.getKey(), reuse.getEdit())) {
+      return reuse;
+    }
+    return null;
+  }
+
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=892451&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
(added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
Sat Dec 19 08:10:45 2009
@@ -0,0 +1,74 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+public class SequenceFileLogWriter implements HLog.Writer {
+
+  SequenceFile.Writer writer;
+  FSDataOutputStream writer_out;
+
+  public SequenceFileLogWriter() { }
+
+  @Override
+  public void init(FileSystem fs, Path path, Configuration conf)
+      throws IOException {
+    writer = SequenceFile.createWriter(fs, conf, path, 
+      HLog.getKeyClass(conf), KeyValue.class, 
+      fs.getConf().getInt("io.file.buffer.size", 4096),
+      (short) conf.getInt("hbase.regionserver.hlog.replication",
+        fs.getDefaultReplication()),
+      conf.getLong("hbase.regionserver.hlog.blocksize",
+        fs.getDefaultBlockSize()),
+      SequenceFile.CompressionType.NONE,
+      new DefaultCodec(),
+      null,
+      new Metadata());
+
+    // Get at the private FSDataOutputStream inside in SequenceFile so we can
+    // call sync on it.  Make it accessible.  Stash it aside for call up in
+    // the sync method.
+    final Field fields[] = writer.getClass().getDeclaredFields();
+    final String fieldName = "out";
+    for (int i = 0; i < fields.length; ++i) {
+      if (fieldName.equals(fields[i].getName())) {
+        try {
+          fields[i].setAccessible(true);
+          this.writer_out = (FSDataOutputStream)fields[i].get(writer);
+          break;
+        } catch (IllegalAccessException ex) {
+          throw new IOException("Accessing " + fieldName, ex);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void append(HLog.Entry entry) throws IOException {
+    this.writer.append(entry.getKey(), entry.getEdit());
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.writer.close();
+  }
+
+  @Override
+  public void sync() throws IOException {
+    this.writer.sync();
+    if (this.writer_out != null) {
+      this.writer_out.sync();
+    }
+  }
+
+}

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Sat
Dec 19 08:10:45 2009
@@ -33,9 +33,6 @@
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-
 
 /** JUnit test case for HLog */
 public class TestHLog extends HBaseTestCase implements HConstants {
@@ -139,10 +136,10 @@
     wal.sync();
     // Open a Reader.
     Path walPath = wal.computeFilename(wal.getFilenum());
-    SequenceFile.Reader reader = HLog.getReader(this.fs, walPath, this.conf);
+    HLog.Reader reader = HLog.getReader(fs, walPath, conf);
     int count = 0;
-    HLogKey key = new HLogKey();
-    while(reader.next(key)) count++;
+    HLog.Entry entry = new HLog.Entry();
+    while ((entry = reader.next(entry)) != null) count++;
     assertEquals(total, count);
     reader.close();
     // Add test that checks to see that an open of a Reader works on a file
@@ -152,16 +149,16 @@
       kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
       wal.append(bytes, bytes, kvs, System.currentTimeMillis());
     }
-    reader = HLog.getReader(this.fs, walPath, this.conf);
+    reader = HLog.getReader(fs, walPath, conf);
     count = 0;
-    while(reader.next(key)) count++;
+    while((entry = reader.next(entry)) != null) count++;
     assertTrue(count >= total);
     reader.close();
     // If I sync, should see double the edits.
     wal.sync();
-    reader = HLog.getReader(this.fs, walPath, this.conf);
+    reader = HLog.getReader(fs, walPath, conf);
     count = 0;
-    while(reader.next(key)) count++;
+    while((entry = reader.next(entry)) != null) count++;
     assertEquals(total * 2, count);
     // Now do a test that ensures stuff works when we go over block boundary,
     // especially that we return good length on file.
@@ -173,16 +170,16 @@
     }
     // Now I should have written out lots of blocks.  Sync then read.
     wal.sync();
-    reader = HLog.getReader(this.fs, walPath, this.conf);
+    reader = HLog.getReader(fs, walPath, conf);
     count = 0;
-    while(reader.next(key)) count++;
+    while((entry = reader.next(entry)) != null) count++;
     assertEquals(total * 3, count);
     reader.close();
     // Close it and ensure that closed, Reader gets right length also.
     wal.close();
-    reader = HLog.getReader(this.fs, walPath, this.conf);
+    reader = HLog.getReader(fs, walPath, conf);
     count = 0;
-    while(reader.next(key)) count++;
+    while((entry = reader.next(entry)) != null) count++;
     assertEquals(total * 3, count);
     reader.close();
   }
@@ -191,14 +188,15 @@
   throws IOException {
     assertEquals(howmany, splits.size());
     for (int i = 0; i < splits.size(); i++) {
-      SequenceFile.Reader r = HLog.getReader(this.fs, splits.get(i), this.conf);
+      HLog.Reader reader = HLog.getReader(this.fs, splits.get(i), conf);
       try {
-        HLogKey key = new HLogKey();
-        KeyValue kv = new KeyValue();
         int count = 0;
         String previousRegion = null;
         long seqno = -1;
-        while(r.next(key, kv)) {
+        HLog.Entry entry = new HLog.Entry();
+        while((entry = reader.next(entry)) != null) {
+          HLogKey key = entry.getKey();
+          KeyValue kv = entry.getEdit();
           String region = Bytes.toString(key.getRegionName());
           // Assert that all edits are for same region.
           if (previousRegion != null) {
@@ -212,7 +210,7 @@
         }
         assertEquals(howmany * howmany, count);
       } finally {
-        r.close();
+        reader.close();
       }
     }
   }
@@ -226,7 +224,7 @@
     final byte [] regionName = Bytes.toBytes("regionname");
     final byte [] tableName = Bytes.toBytes("tablename");
     final byte [] row = Bytes.toBytes("row");
-    Reader reader = null;
+    HLog.Reader reader = null;
     HLog log = new HLog(fs, dir, this.conf, null);
     try {
       // Write columns named 1, 2, 3, etc. and then values of single byte
@@ -246,17 +244,20 @@
       log = null;
       // Now open a reader on the log and assert append worked.
       reader = HLog.getReader(fs, filename, conf);
-      HLogKey key = new HLogKey();
-      KeyValue val = new KeyValue();
+      HLog.Entry entry = new HLog.Entry();
       for (int i = 0; i < COL_COUNT; i++) {
-        reader.next(key, val);
+        reader.next(entry);
+        HLogKey key = entry.getKey();
+        KeyValue val = entry.getEdit();
         assertTrue(Bytes.equals(regionName, key.getRegionName()));
         assertTrue(Bytes.equals(tableName, key.getTablename()));
         assertTrue(Bytes.equals(row, val.getRow()));
         assertEquals((byte)(i + '0'), val.getValue()[0]);
         System.out.println(key + " " + val);
       }
-      while (reader.next(key, val)) {
+      while ((entry = reader.next(null)) != null) {
+        HLogKey key = entry.getKey();
+        KeyValue val = entry.getEdit();
         // Assert only one more row... the meta flushed row.
         assertTrue(Bytes.equals(regionName, key.getRegionName()));
         assertTrue(Bytes.equals(tableName, key.getTablename()));



Mime
View raw message