hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1522073 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Wed, 11 Sep 2013 22:22:17 GMT
Author: stack
Date: Wed Sep 11 22:22:17 2013
New Revision: 1522073

URL: http://svn.apache.org/r1522073
Log:
HBASE-9460 Fix HLogPerformanceEvaluation so runs against localfs

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1522073&r1=1522072&r2=1522073&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Wed Sep 11 22:22:17 2013
@@ -23,7 +23,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Pattern;
 
@@ -42,10 +41,12 @@ import org.apache.hadoop.io.Writable;
 
 
 @InterfaceAudience.Private
+// TODO: Rename interface to WAL
 public interface HLog {
   Log LOG = LogFactory.getLog(HLog.class);
 
   /** File Extension used while splitting an HLog into regions (HBASE-2312) */
+  // TODO: this seems like an implementation detail that does not belong here.
   String SPLITTING_EXT = "-splitting";
   boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
   /** The hbase:meta region's HLog filename extension */
@@ -55,10 +56,11 @@ public interface HLog {
    * Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater
than the
    * configured size, a warning is logged. This is used with Protobuf reader/writer.
    */
-  String WAL_TRAILER_WARN_SIZE =
-    "hbase.regionserver.waltrailer.warn.size";
-  int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024*1024; // 1MB
+  // TODO: Implementation detail.  Why in here?
+  String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
+  int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
 
+  // TODO: Implemenation detail.  Why in here?
   Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
   String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
 
@@ -67,7 +69,7 @@ public interface HLog {
     /**
      * @param fs File system.
      * @param path Path.
-     * @param c Config.
+     * @param c Configuration.
      * @param s Input stream that may have been pre-opened by the caller; may be null.
      */
     void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
@@ -87,6 +89,7 @@ public interface HLog {
      * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt
WAL
      *         files.
      */
+    // TODO: What we need a trailer on WAL for?
     WALTrailer getWALTrailer();
   }
 
@@ -109,9 +112,10 @@ public interface HLog {
   }
 
   /**
-   * Utility class that lets us keep track of the edit with it's key Only used
-   * when splitting logs
+   * Utility class that lets us keep track of the edit with it's key.
+   * Only used when splitting logs.
    */
+  // TODO: Remove this Writable.
   class Entry implements Writable {
     private WALEdit edit;
     private HLogKey key;
@@ -124,10 +128,8 @@ public interface HLog {
     /**
      * Constructor for both params
      *
-     * @param edit
-     *          log's edit
-     * @param key
-     *          log's key
+     * @param edit log's edit
+     * @param key log's key
      */
     public Entry(HLogKey key, WALEdit edit) {
       super();
@@ -199,16 +201,14 @@ public interface HLog {
   /**
    * @return Current state of the monotonically increasing file id.
    */
+  // TODO: Remove.  Implementation detail.
   long getFilenum();
 
   /**
-   * Called by HRegionServer when it opens a new region to ensure that log
-   * sequence numbers are always greater than the latest sequence number of the
-   * region being brought on-line.
-   *
-   * @param newvalue
-   *          We'll set log edit/sequence number to this value if it is greater
-   *          than the current value.
+   * Called to ensure that log sequence numbers are always greater
+   *
+   * @param newvalue We'll set log edit/sequence number to this value if it is greater
+   * than the current value.
    */
   void setSequenceNumber(final long newvalue);
 
@@ -217,6 +217,7 @@ public interface HLog {
    */
   long getSequenceNumber();
 
+  // TODO: Log rolling should not be in this interface.
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
    *
@@ -274,7 +275,7 @@ public interface HLog {
 
   /**
    * Append a set of edits to the log. Log edits are keyed by (encoded)
-   * regionName, rowname, and log-sequence-id. The HLog is flushed after this
+   * regionName, row name, and log-sequence-id. The HLog is flushed after this
    * transaction is written to the log.
    * @param info
    * @param tableName
@@ -298,9 +299,10 @@ public interface HLog {
    * @return txid of this transaction
    * @throws IOException
    */
-  public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, 
+  public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
       List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
-  
+
+  // TODO: Do we need all these versions of sync?
   void hsync() throws IOException;
 
   void hflush() throws IOException;
@@ -312,6 +314,7 @@ public interface HLog {
   /**
    * Obtain a log sequence number.
    */
+  // TODO: Name better to differentiate from getSequenceNumber.
   long obtainSeqNum();
 
   /**
@@ -355,6 +358,7 @@ public interface HLog {
    *
    * @return lowReplicationRollEnabled
    */
+  // TODO: This is implementation detail?
   boolean isLowReplicationRollEnabled();
 
   /** Gets the earliest sequence number in the memstore for this particular region.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java?rev=1522073&r1=1522072&r2=1522073&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
Wed Sep 11 22:22:17 2013
@@ -193,19 +193,24 @@ public class ProtobufLogReader extends R
   @Override
   protected boolean readNext(HLog.Entry entry) throws IOException {
     while (true) {
+      // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
       long originalPosition = this.inputStream.getPos();
-      if (trailerPresent && originalPosition == this.walEditsStopOffset) return false;
+      if (trailerPresent && originalPosition > 0 && originalPosition ==
this.walEditsStopOffset) {
+        return false;
+      }
       WALKey.Builder builder = WALKey.newBuilder();
-      int size = 0;
+      long size = 0;
       try {
-        int originalAvailable = this.inputStream.available();
+        long available = -1;
         try {
           int firstByte = this.inputStream.read();
           if (firstByte == -1) {
             throw new EOFException("First byte is negative");
           }
           size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
-          if (this.inputStream.available() < size) {
+          // available may be < 0 on local fs for instance.  If so, can't depend on it.
+          available = this.inputStream.available();
+          if (available > 0 && available < size) {
             throw new EOFException("Available stream not enough for edit, " +
                 "inputStream.available()= " + this.inputStream.available() + ", " +
                 "entry size= " + size);
@@ -214,9 +219,8 @@ public class ProtobufLogReader extends R
           builder.mergeFrom(limitedInput);
         } catch (InvalidProtocolBufferException ipbe) {
           throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+
-              originalPosition + ", currentPosition=" + this.inputStream.getPos() +
-              ", messageSize=" + size + ", originalAvailable=" + originalAvailable +
-              ", currentAvailable=" + this.inputStream.available()).initCause(ipbe);
+            originalPosition + ", currentPosition=" + this.inputStream.getPos() +
+            ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
         }
         if (!builder.isInitialized()) {
           // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
@@ -258,6 +262,10 @@ public class ProtobufLogReader extends R
         }
       } catch (EOFException eof) {
         LOG.trace("Encountered a malformed edit, seeking back to last good position in file",
eof);
+        // If originalPosition is < 0, it is rubbish and we cannot use it (probably local
fs)
+        if (originalPosition < 0) throw eof;
+        // Else restore our position to original location in hope that next time through
we will
+        // read successfully.
         seekOnFs(originalPosition);
         return false;
       }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1522073&r1=1522072&r2=1522073&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
Wed Sep 11 22:22:17 2013
@@ -69,7 +69,7 @@ public final class HLogPerformanceEvalua
 
   /**
    * Perform HLog.append() of Put object, for the number of iterations requested.
-   * Keys and Vaues are generated randomly, the number of column familes,
+   * Keys and Vaues are generated randomly, the number of column families,
    * qualifiers and key/value size is tunable by the user.
    */
   class HLogPutBenchmark implements Runnable {
@@ -126,6 +126,7 @@ public final class HLogPerformanceEvalua
     boolean noSync = false;
     boolean verify = false;
     boolean verbose = false;
+    boolean cleanup = true;
     long roll = Long.MAX_VALUE;
     // Process command line args
     for (int i = 0; i < args.length; i++) {
@@ -151,6 +152,8 @@ public final class HLogPerformanceEvalua
           verify = true;
         } else if (cmd.equals("-verbose")) {
           verbose = true;
+        } else if (cmd.equals("-nocleanup")) {
+          cleanup = false;
         } else if (cmd.equals("-roll")) {
           roll = Long.parseLong(args[++i]);
         } else if (cmd.equals("-h")) {
@@ -205,8 +208,12 @@ public final class HLogPerformanceEvalua
         if (verify) {
           Path dir = ((FSHLog) hlog).getDir();
           long editCount = 0;
-          for (FileStatus fss: fs.listStatus(dir)) {
-            editCount += verify(fss.getPath(), verbose);
+          FileStatus [] fsss = fs.listStatus(dir);
+          if (fsss.length == 0) throw new IllegalStateException("No WAL found");
+          for (FileStatus fss: fsss) {
+            Path p = fss.getPath();
+            if (!fs.exists(p)) throw new IllegalStateException(p.toString());
+            editCount += verify(p, verbose);
           }
           long expected = numIterations * numThreads;
           if (editCount != expected) {
@@ -216,7 +223,7 @@ public final class HLogPerformanceEvalua
       } finally {
         if (region != null) closeRegion(region);
         // Remove the root dir for this test region
-        cleanRegionRootDir(fs, rootRegionDir);
+        if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
       }
     } finally {
       fs.close();
@@ -243,14 +250,16 @@ public final class HLogPerformanceEvalua
    * @throws IOException
    */
   private long verify(final Path wal, final boolean verbose) throws IOException {
-    HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), 
-        wal, getConf());
+    HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf());
     long previousSeqid = -1;
     long count = 0;
     try {
       while (true) {
         Entry e = reader.next();
-        if (e == null) break;
+        if (e == null) {
+          LOG.debug("Read count=" + count + " from " + wal);
+          break;
+        }
         count++;
         long seqid = e.getKey().getLogSeqNum();
         if (verbose) LOG.info("seqid=" + seqid);
@@ -282,6 +291,7 @@ public final class HLogPerformanceEvalua
     System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
     System.err.println("  -keySize <N>     Row key size in byte.");
     System.err.println("  -valueSize <N>   Row/Col value size in byte.");
+    System.err.println("  -nocleanup       Do NOT remove test data when done.");
     System.err.println("  -nosync          Append without syncing");
     System.err.println("  -verify          Verify edits written in sequence");
     System.err.println("  -verbose         Output extra info; e.g. all edit seq ids when
verifying");
@@ -342,7 +352,7 @@ public final class HLogPerformanceEvalua
     Thread[] threads = new Thread[numThreads];
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < numThreads; ++i) {
-      threads[i] = new Thread(runnable);
+      threads[i] = new Thread(runnable, "t" + i);
       threads[i].start();
     }
     for (Thread t : threads) t.join();
@@ -355,7 +365,7 @@ public final class HLogPerformanceEvalua
    * Call this method to avoid the {@link #main(String[])} System.exit.
    * @param args
    * @return errCode
-   * @throws Exception 
+   * @throws Exception
    */
   static int innerMain(final String [] args) throws Exception {
     return ToolRunner.run(HBaseConfiguration.create(), new HLogPerformanceEvaluation(), args);



Mime
View raw message