hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-19503 Fix TestWALOpenAfterDNRollingStart for AsyncFSWAL
Date Thu, 14 Dec 2017 01:55:30 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 104afd74a -> ba5f9ac38


HBASE-19503 Fix TestWALOpenAfterDNRollingStart for AsyncFSWAL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba5f9ac3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba5f9ac3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba5f9ac3

Branch: refs/heads/master
Commit: ba5f9ac380a91afa64d2114238a8f5ff3c73819f
Parents: 104afd7
Author: zhangduo <zhangduo@apache.org>
Authored: Thu Dec 14 09:41:12 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Thu Dec 14 09:41:12 2017 +0800

----------------------------------------------------------------------
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   |   3 +-
 .../hadoop/hbase/regionserver/LogRoller.java    |  17 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java   | 155 +++++++++++++++----
 .../hbase/regionserver/wal/AsyncFSWAL.java      |  15 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  91 +----------
 .../hbase/regionserver/wal/ReaderBase.java      |   4 +-
 .../wal/TestWALOpenAfterDNRollingStart.java     | 116 +++++++++-----
 7 files changed, 222 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba5f9ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 91086d7..b874aa7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -380,7 +380,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   @Override
   public DatanodeInfo[] getPipeline() {
-    return locations;
+    State state = this.state;
+    return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0];
   }
 
   private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba5f9ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 809a457..451b886 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -27,16 +27,16 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
@@ -122,14 +122,11 @@ public class LogRoller extends HasThread implements Closeable {
     try {
       for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
         WAL wal = entry.getKey();
-        boolean neeRollAlready = entry.getValue();
-        if(wal instanceof FSHLog && !neeRollAlready) {
-          FSHLog hlog = (FSHLog)wal;
-          if ((now - hlog.getLastTimeCheckLowReplication())
-              > this.checkLowReplicationInterval) {
-            hlog.checkLogRoll();
-          }
+        boolean needRollAlready = entry.getValue();
+        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
+          continue;
         }
+        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
       }
     } catch (Throwable e) {
       LOG.warn("Failed checking low replication", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba5f9ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 246221b..9929036 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE
 
 import com.lmax.disruptor.RingBuffer;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.management.MemoryType;
@@ -55,6 +56,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -67,13 +69,16 @@ import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.DrainBarrier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
 import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.core.TraceScope;
@@ -222,6 +227,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
    */
   volatile W writer;
 
+  // Last time to check low replication on hlog's pipeline
+  private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
+
   protected volatile boolean closed = false;
 
   protected final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -230,7 +238,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
    * an IllegalArgumentException if used to compare paths from different wals.
    */
   final Comparator<Path> LOG_NAME_COMPARATOR =
-      (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
+    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
 
   private static final class WalProps {
 
@@ -256,8 +264,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
    * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
    * (contained in the log file name).
    */
-  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = new ConcurrentSkipListMap<>(
-      LOG_NAME_COMPARATOR);
+  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
+    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
 
   /**
    * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
@@ -311,8 +319,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
     // be stuck and make no progress if the buffer is filled with appends only and there
is no
     // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
     // before they return.
-    int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count",
-      1024 * 16);
+    int preallocatedEventCount =
+      this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
     checkArgument(preallocatedEventCount >= 0,
       "hbase.regionserver.wal.disruptor.event.count must > 0");
     int floor = Integer.highestOneBit(preallocatedEventCount);
@@ -346,12 +354,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
     }
 
     // If prefix is null||empty then just name it wal
-    this.walFilePrefix = prefix == null || prefix.isEmpty() ? "wal"
-        : URLEncoder.encode(prefix, "UTF8");
+    this.walFilePrefix =
+      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
     // we only correctly differentiate suffices when numeric ones start with '.'
     if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER)))
{
-      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
-          + "' but instead was '" + suffix + "'");
+      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
+
+        "' but instead was '" + suffix + "'");
     }
     // Now that it exists, set the storage policy for the entire directory of wal files related
to
     // this FSHLog instance
@@ -398,8 +406,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
     // (it costs a little x'ing bocks)
     final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
       CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir));
-    this.logrollsize = (long) (blocksize
-        * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
+    this.logrollsize =
+      (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
 
     boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
     if (maxLogsDefined) {
@@ -408,9 +416,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
       Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
 
-    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
-        + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ",
suffix="
-        + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
+    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
+
+      StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
+
+      walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
     this.slowSyncNs = TimeUnit.MILLISECONDS
         .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
     this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
@@ -588,8 +596,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
     int logCount = getNumRolledLogFiles();
     if (logCount > this.maxLogs && logCount > 0) {
       Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
-      regions = this.sequenceIdAccounting
-          .findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
+      regions =
+        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
     }
     if (regions != null) {
       StringBuilder sb = new StringBuilder();
@@ -599,8 +607,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
         }
         sb.append(Bytes.toStringBinary(regions[i]));
       }
-      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + "; forcing
flush of "
-          + regions.length + " regions(s): " + sb.toString());
+      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
+        "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
     }
     return regions;
   }
@@ -688,8 +696,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
         this.walFile2Props.put(oldPath,
           new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
         this.totalLogSize.addAndGet(oldFileLen);
-        LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
-            + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
+        LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
+
+          ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
       } else {
         LOG.info("New WAL " + newPathString);
       }
@@ -755,8 +763,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
         newPath = replaceWriter(oldPath, newPath, nextWriter);
         tellListenersAboutPostLogRoll(oldPath, newPath);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Create new " + implClassName + " writer with pipeline: "
-              + Arrays.toString(getPipeline()));
+          LOG.debug("Create new " + implClassName + " writer with pipeline: " +
+            Arrays.toString(getPipeline()));
         }
         // Can we delete any of the old log files?
         if (getNumRolledLogFiles() > 0) {
@@ -766,8 +774,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
       } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
         // If the underlying FileSystem can't do what we ask, treat as IO failure so
         // we'll abort.
-        throw new IOException("Underlying FileSystem can't meet stream requirements. See
RS log " +
-            "for details.", exception);
+        throw new IOException(
+            "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
+            exception);
       } finally {
         closeBarrier.endOp();
       }
@@ -843,8 +852,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
           }
         }
       }
-      LOG.debug("Moved " + files.length + " WAL file(s) to " +
-          CommonFSUtils.getPath(this.walArchiveDir));
+      LOG.debug(
+        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
     }
     LOG.info("Closed WAL: " + toString());
   }
@@ -979,8 +988,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
 
   @Override
   public String toString() {
-    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num "
-        + filenum + ")";
+    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum
+ ")";
   }
 
   /**
@@ -1023,8 +1031,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
 
   protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
 
-  protected abstract W createWriterInstance(Path path) throws IOException,
-      CommonFSUtils.StreamLacksCapabilityException;
+  protected abstract W createWriterInstance(Path path)
+      throws IOException, CommonFSUtils.StreamLacksCapabilityException;
 
   /**
    * @return old wal file size
@@ -1034,6 +1042,27 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
 
   protected abstract void doShutdown() throws IOException;
 
+  protected abstract boolean doCheckLogLowReplication();
+
+  public void checkLogLowReplication(long checkInterval) {
+    long now = EnvironmentEdgeManager.currentTime();
+    if (now - lastTimeCheckLowReplication < checkInterval) {
+      return;
+    }
+    // Will return immediately if we are in the middle of a WAL log roll currently.
+    if (!rollWriterLock.tryLock()) {
+      return;
+    }
+    try {
+      lastTimeCheckLowReplication = now;
+      if (doCheckLogLowReplication()) {
+        requestLogRoll(true);
+      }
+    } finally {
+      rollWriterLock.unlock();
+    }
+  }
+
   /**
    * This method gets the pipeline for the current WAL.
    */
@@ -1045,4 +1074,68 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
    */
   @VisibleForTesting
   abstract int getLogReplication();
+
+  private static void split(final Configuration conf, final Path p) throws IOException {
+    FileSystem fs = FSUtils.getWALFileSystem(conf);
+    if (!fs.exists(p)) {
+      throw new FileNotFoundException(p.toString());
+    }
+    if (!fs.getFileStatus(p).isDirectory()) {
+      throw new IOException(p + " is not a directory");
+    }
+
+    final Path baseDir = FSUtils.getWALRootDir(conf);
+    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
+      AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
+      archiveDir = new Path(archiveDir, p.getName());
+    }
+    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
+  }
+
+  private static void usage() {
+    System.err.println("Usage: AbstractFSWAL <ARGS>");
+    System.err.println("Arguments:");
+    System.err.println(" --dump  Dump textual representation of passed one or more files");
+    System.err.println("         For example: " +
+      "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
+    System.err.println(" --split Split the passed directory of WAL logs");
+    System.err.println(
+      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
+  }
+
+  /**
+   * Pass one or more log file names and it will either dump out a text version on
+   * <code>stdout</code> or split the specified log files.
+   */
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2) {
+      usage();
+      System.exit(-1);
+    }
+    // either dump using the WALPrettyPrinter or split, depending on args
+    if (args[0].compareTo("--dump") == 0) {
+      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
+    } else if (args[0].compareTo("--perf") == 0) {
+      LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
+      LOG.fatal(
+        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
+      System.exit(-1);
+    } else if (args[0].compareTo("--split") == 0) {
+      Configuration conf = HBaseConfiguration.create();
+      for (int i = 1; i < args.length; i++) {
+        try {
+          Path logPath = new Path(args[i]);
+          FSUtils.setFsDefault(conf, logPath);
+          split(conf, logPath);
+        } catch (IOException t) {
+          t.printStackTrace(System.err);
+          System.exit(-1);
+        }
+      }
+    } else {
+      usage();
+      System.exit(-1);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba5f9ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 832eefd..4ccfdf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.Nam
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -536,7 +535,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
         if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor())
{
           // we will give up consuming so if there are some unsynced data we need to issue
a sync.
           if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()
&&
-              syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
+            syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {
             // no new data in the ringbuffer and we have at least one sync request
             sync(writer);
           }
@@ -564,8 +563,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   @Override
   public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
       throws IOException {
-    long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
-        waitingConsumePayloads);
+    long txid =
+      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
     if (shouldScheduleConsumer()) {
       consumeExecutor.execute(consumer);
     }
@@ -746,4 +745,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   int getLogReplication() {
     return getPipeline().length;
   }
+
+  @Override
+  protected boolean doCheckLogLowReplication() {
+    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to
DNs, so
+    // typically there is no 'low replication' state, only a 'broken' state.
+    AsyncFSOutput output = this.fsOut;
+    return output != null && output.getPipeline().length == 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba5f9ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 3da37d3..fd9d6c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -25,7 +25,6 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
@@ -42,25 +41,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
-import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -156,10 +148,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
 
   private final AtomicInteger closeErrorCount = new AtomicInteger();
 
-  // Last time to check low replication on hlog's pipeline
-  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
-
-
   /**
    * Exception handler to pass the disruptor ringbuffer. Same as native implementation only
it logs
    * using our logger instead of java native logger.
@@ -627,14 +615,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
   /**
    * Schedule a log roll if needed.
    */
-  public void checkLogRoll() {
+  private void checkLogRoll() {
     // Will return immediately if we are in the middle of a WAL log roll currently.
     if (!rollWriterLock.tryLock()) {
       return;
     }
     boolean lowReplication;
     try {
-      lowReplication = checkLowReplication();
+      lowReplication = doCheckLogLowReplication();
     } finally {
       rollWriterLock.unlock();
     }
@@ -646,9 +634,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
   /**
    * @return true if number of replicas for the WAL is lower than threshold
    */
-  private boolean checkLowReplication() {
+  protected boolean doCheckLogLowReplication() {
     boolean logRollNeeded = false;
-    this.lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
     // if the number of replicas in HDFS has fallen below the configured
     // value, then roll logs.
     try {
@@ -767,24 +754,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER
           + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
 
-  private static void split(final Configuration conf, final Path p) throws IOException {
-    FileSystem fs = FSUtils.getWALFileSystem(conf);
-    if (!fs.exists(p)) {
-      throw new FileNotFoundException(p.toString());
-    }
-    if (!fs.getFileStatus(p).isDirectory()) {
-      throw new IOException(p + " is not a directory");
-    }
-
-    final Path baseDir = FSUtils.getWALRootDir(conf);
-    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
-      AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
-      archiveDir = new Path(archiveDir, p.getName());
-    }
-    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
-  }
-
   /**
    * This class is used coordinating two threads holding one thread at a 'safe point' while
the
    * orchestrating thread does some work that requires the first thread paused: e.g. holding
the WAL
@@ -1118,52 +1087,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
   }
 
-  private static void usage() {
-    System.err.println("Usage: FSHLog <ARGS>");
-    System.err.println("Arguments:");
-    System.err.println(" --dump  Dump textual representation of passed one or more files");
-    System.err.println("         For example: "
-        + "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
-    System.err.println(" --split Split the passed directory of WAL logs");
-    System.err.println(
-      "         For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR");
-  }
-
-  /**
-   * Pass one or more log file names and it will either dump out a text version on
-   * <code>stdout</code> or split the specified log files.
-   */
-  public static void main(String[] args) throws IOException {
-    if (args.length < 2) {
-      usage();
-      System.exit(-1);
-    }
-    // either dump using the WALPrettyPrinter or split, depending on args
-    if (args[0].compareTo("--dump") == 0) {
-      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
-    } else if (args[0].compareTo("--perf") == 0) {
-      LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
-      LOG.fatal(
-        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
-      System.exit(-1);
-    } else if (args[0].compareTo("--split") == 0) {
-      Configuration conf = HBaseConfiguration.create();
-      for (int i = 1; i < args.length; i++) {
-        try {
-          Path logPath = new Path(args[i]);
-          FSUtils.setFsDefault(conf, logPath);
-          split(conf, logPath);
-        } catch (IOException t) {
-          t.printStackTrace(System.err);
-          System.exit(-1);
-        }
-      }
-    } else {
-      usage();
-      System.exit(-1);
-    }
-  }
-
   /**
    * This method gets the pipeline for the current WAL.
    */
@@ -1176,12 +1099,4 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
     return new DatanodeInfo[0];
   }
-
-  /**
-   *
-   * @return last time on checking low replication
-   */
-  public long getLastTimeCheckLowReplication() {
-    return this.lastTimeCheckLowReplication;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba5f9ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 2a01b14..9a6bfd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -29,13 +29,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 public abstract class ReaderBase implements AbstractFSWALProvider.Reader {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba5f9ac3/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
index cf2c5d7..a612aff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
@@ -18,77 +18,109 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
-
-@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, LargeTests.class })
 public class TestWALOpenAfterDNRollingStart {
-  final Log LOG = LogFactory.getLog(getClass());
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static long DataNodeRestartInterval;
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  // Sleep time before restart next dn, we need to wait the current dn to finish start up
+  private static long DN_RESTART_INTERVAL = 15000;
+
+  // interval of checking low replication. The sleep time must smaller than
+  // DataNodeRestartInterval
+  // so a low replication case will be detected and the wal will be rolled
+  private static long CHECK_LOW_REPLICATION_INTERVAL = 10000;
+
+  @Parameter
+  public String walProvider;
+
+  @Parameters(name = "{index}: wal={0}")
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
+  }
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    // Sleep time before restart next dn, we need to wait the current dn to finish start
up
-    DataNodeRestartInterval = 15000;
-    // interval of checking low replication. The sleep time must smaller than DataNodeRestartInterval
-    // so a low replication case will be detected and the wal will be rolled
-    long checkLowReplicationInterval = 10000;
-    //don't let hdfs client to choose a new replica when dn down
-    TEST_UTIL.getConfiguration().setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable",
-        false);
+    // don't let hdfs client to choose a new replica when dn down
+    TEST_UTIL.getConfiguration()
+        .setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", false);
     TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
-        checkLowReplicationInterval);
+      CHECK_LOW_REPLICATION_INTERVAL);
     TEST_UTIL.startMiniDFSCluster(3);
-    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.startMiniZKCluster();
+  }
 
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    TEST_UTIL.getConfiguration().set("hbase.wal.provider", walProvider);
+    TEST_UTIL.startMiniHBaseCluster(1, 1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniHBaseCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
   }
 
   /**
-   * see HBASE-18132
-   * This is a test case of failing open a wal(for replication for example) after all datanode
-   * restarted (rolling upgrade, for example).
-   * Before this patch, low replication detection is only used when syncing wal.
-   * But if the wal haven't had any entry whiten, it will never know all the replica of the
wal
-   * is broken(because of dn restarting). And this wal can never be open
+   * see HBASE-18132 This is a test case of failing open a wal(for replication for example)
after
+   * all datanode restarted (rolling upgrade, for example). Before this patch, low replication
+   * detection is only used when syncing wal. But if the wal haven't had any entry whiten,
it will
+   * never know all the replica of the wal is broken(because of dn restarting). And this
wal can
+   * never be open
    * @throws Exception
    */
-  @Test(timeout = 300000)
+  @Test
   public void test() throws Exception {
     HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
-    FSHLog hlog = (FSHLog)server.getWAL(null);
-    Path currentFile = hlog.getCurrentFileName();
-    //restart every dn to simulate a dn rolling upgrade
-    for(int i = 0; i < TEST_UTIL.getDFSCluster().getDataNodes().size(); i++) {
-      //This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped
dn from
-      //the dn list and then add to the tail of this list, we need to always restart the
first one
-      //to simulate rolling upgrade of every dn.
+    AbstractFSWAL<?> wal = (AbstractFSWAL<?>) server.getWAL(null);
+    Path currentFile = wal.getCurrentFileName();
+    // restart every dn to simulate a dn rolling upgrade
+    for (int i = 0, n = TEST_UTIL.getDFSCluster().getDataNodes().size(); i < n; i++) {
+      // This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped
dn from
+      // the dn list and then add to the tail of this list, we need to always restart the
first one
+      // to simulate rolling upgrade of every dn.
       TEST_UTIL.getDFSCluster().restartDataNode(0);
-      //sleep enough time so log roller can detect the pipeline break and roll log
-      Thread.sleep(DataNodeRestartInterval);
+      // sleep enough time so log roller can detect the pipeline break and roll log
+      Thread.sleep(DN_RESTART_INTERVAL);
     }
 
-    if(!server.getFileSystem().exists(currentFile)) {
+    if (!server.getFileSystem().exists(currentFile)) {
       Path walRootDir = FSUtils.getWALRootDir(TEST_UTIL.getConfiguration());
       final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
       currentFile = new Path(oldLogDir, currentFile.getName());
     }
-    //if the log is not rolled, then we can never open this wal forever.
-    WAL.Reader reader = WALFactory
-        .createReader(TEST_UTIL.getTestFileSystem(), currentFile, TEST_UTIL.getConfiguration());
-
+    // if the log is not rolled, then we can never open this wal forever.
+    try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile,
+      TEST_UTIL.getConfiguration())) {
+      reader.next();
+    }
   }
-
-
 }


Mime
View raw message