hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject [1/2] hbase git commit: HBASE-15995 Separate replication WAL reading from shipping
Date Sat, 03 Jun 2017 16:49:07 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 b66a478e7 -> 3cf443326


http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
new file mode 100644
index 0000000..c4d552c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -0,0 +1,411 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALFactory;
+
+/**
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and
continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a
Path, it
+ * dequeues it and starts reading from the next.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry>
{
+  private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
+
+  private Reader reader;
+  private Path currentPath;
+  // cache of next entry for hasNext()
+  private Entry currentEntry;
+  // position after reading current entry
+  private long currentPosition = 0;
+  private PriorityBlockingQueue<Path> logQueue;
+  private FileSystem fs;
+  private Configuration conf;
+  private MetricsSource metrics;
+
+  /**
+   * Create an entry stream over the given queue
+   * @param logQueue the queue of WAL paths
+   * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
+   * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+   * @param metrics replication metrics
+   * @throws IOException
+   */
+  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration
conf,
+      MetricsSource metrics)
+      throws IOException {
+    this(logQueue, fs, conf, 0, metrics);
+  }
+
+  /**
+   * Create an entry stream over the given queue at the given start position
+   * @param logQueue the queue of WAL paths
+   * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
+   * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+   * @param startPosition the position in the first WAL to start reading at
+   * @param metrics replication metrics
+   * @throws IOException
+   */
+  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration
conf,
+      long startPosition, MetricsSource metrics) throws IOException {
+    this.logQueue = logQueue;
+    this.fs = fs;
+    this.conf = conf;
+    this.currentPosition = startPosition;
+    this.metrics = metrics;
+  }
+
+  /**
+   * @return true if there is another WAL {@link Entry}
+   * @throws WALEntryStreamRuntimeException if there was an Exception while reading
+   */
+  @Override
+  public boolean hasNext() {
+    if (currentEntry == null) {
+      try {
+        tryAdvanceEntry();
+      } catch (Exception e) {
+        throw new WALEntryStreamRuntimeException(e);
+      }
+    }
+    return currentEntry != null;
+  }
+
+  /**
+   * @return the next WAL entry in this stream
+   * @throws WALEntryStreamRuntimeException if there was an IOException
+   * @throws NoSuchElementException if no more entries in the stream.
+   */
+  @Override
+  public Entry next() {
+    if (!hasNext()) throw new NoSuchElementException();
+    Entry save = currentEntry;
+    currentEntry = null; // gets reloaded by hasNext()
+    return save;
+  }
+
+  /**
+   * Not supported.
+   */
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() throws IOException {
+    closeReader();
+  }
+
+  /**
+   * @return the iterator over WAL entries in the queue.
+   */
+  @Override
+  public Iterator<Entry> iterator() {
+    return this;
+  }
+
+  /**
+   * @return the position of the last Entry returned by next()
+   */
+  public long getPosition() {
+    return currentPosition;
+  }
+
+  /**
+   * @return the {@link Path} of the current WAL
+   */
+  public Path getCurrentPath() {
+    return currentPath;
+  }
+
+  private String getCurrentPathStat() {
+    StringBuilder sb = new StringBuilder();
+    if (currentPath != null) {
+      sb.append("currently replicating from: ").append(currentPath).append(" at position:
")
+          .append(currentPosition).append("\n");
+    } else {
+      sb.append("no replication ongoing, waiting for new log");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Should be called if the stream is to be reused (i.e. used again after hasNext() has
returned
+   * false)
+   * @throws IOException
+   */
+  public void reset() throws IOException {
+    if (reader != null && currentPath != null) {
+      resetReader();
+    }
+  }
+
+  private void setPosition(long position) {
+    currentPosition = position;
+  }
+
+  private void setCurrentPath(Path path) {
+    this.currentPath = path;
+  }
+
+  private void tryAdvanceEntry() throws IOException {
+    if (checkReader()) {
+      readNextEntryAndSetPosition();
+      if (currentEntry == null) { // no more entries in this log file - see if log was rolled
+        if (logQueue.size() > 1) { // log was rolled
+          // Before dequeueing, we should always get one more attempt at reading.
+          // This is in case more entries came in after we opened the reader,
+          // and a new log was enqueued while we were reading. See HBASE-6758
+          resetReader();
+          readNextEntryAndSetPosition();
+          if (currentEntry == null) {
+            if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+              dequeueCurrentLog();
+              if (openNextLog()) {
+                readNextEntryAndSetPosition();
+              }
+            }
+          }
+        } // no other logs, we've simply hit the end of the current open log. Do nothing
+      }
+    }
+    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
+  }
+
+  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
+  private boolean checkAllBytesParsed() throws IOException {
+    // -1 means the wal wasn't closed cleanly.
+    final long trailerSize = currentTrailerSize();
+    FileStatus stat = null;
+    try {
+      stat = fs.getFileStatus(this.currentPath);
+    } catch (IOException exception) {
+      LOG.warn("Couldn't get file length information about log " + this.currentPath + ",
it "
+          + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
+      metrics.incrUnknownFileLengthForClosedWAL();
+    }
+    if (stat != null) {
+      if (trailerSize < 0) {
+        if (currentPosition < stat.getLen()) {
+          final long skippedBytes = stat.getLen() - currentPosition;
+          LOG.info("Reached the end of WAL file '" + currentPath
+              + "'. It was not closed cleanly, so we did not parse " + skippedBytes
+              + " bytes of data.");
+          metrics.incrUncleanlyClosedWALs();
+          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
+        }
+      } else if (currentPosition + trailerSize < stat.getLen()) {
+        LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+            + ", which is too far away from reported file length " + stat.getLen()
+            + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
+        setPosition(0);
+        resetReader();
+        metrics.incrRestartedWALReading();
+        metrics.incrRepeatedFileBytes(currentPosition);
+        return false;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file
is "
+          + (stat == null ? "N/A" : stat.getLen()));
+    }
+    metrics.incrCompletedWAL();
+    return true;
+  }
+
+  private void dequeueCurrentLog() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reached the end of log " + currentPath);
+    }
+    closeReader();
+    logQueue.remove();
+    setPosition(0);
+    metrics.decrSizeOfLogQueue();
+  }
+
+  private void readNextEntryAndSetPosition() throws IOException {
+    Entry readEntry = reader.next();
+    long readerPos = reader.getPosition();
+    if (readEntry != null) {
+      metrics.incrLogEditsRead();
+      metrics.incrLogReadInBytes(readerPos - currentPosition);
+    }
+    currentEntry = readEntry; // could be null
+    setPosition(readerPos);
+  }
+
+  private void closeReader() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+  }
+
+  // if we don't have a reader, open a reader on the next log
+  private boolean checkReader() throws IOException {
+    if (reader == null) {
+      return openNextLog();
+    }
+    return true;
+  }
+
+  // open a reader on the next log in queue
+  private boolean openNextLog() throws IOException {
+    Path nextPath = logQueue.peek();
+    if (nextPath != null) {
+      openReader(nextPath);
+      if (reader != null) return true;
+    }
+    return false;
+  }
+
+  private Path getArchivedLog(Path path) throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path archivedLogLocation = new Path(oldLogDir, path.getName());
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    } else {
+      LOG.error("Couldn't locate log: " + path);
+      return path;
+    }
+  }
+
+  private void openReader(Path path) throws IOException {
+    try {
+      // Detect if this is a new file, if so get a new reader else
+      // reset the current reader so that we see the new data
+      if (reader == null || !getCurrentPath().equals(path)) {
+        closeReader();
+        reader = WALFactory.createReader(fs, path, conf);
+        seek();
+        setCurrentPath(path);
+      } else {
+        resetReader();
+      }
+    } catch (FileNotFoundException fnfe) {
+      // If the log was archived, continue reading from there
+      Path archivedLog = getArchivedLog(path);
+      if (!path.equals(archivedLog)) {
+        openReader(archivedLog);
+      } else {
+        throw fnfe;
+      }
+    } catch (LeaseNotRecoveredException lnre) {
+      // HBASE-15019 the WAL was not closed due to some hiccup.
+      LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
+      recoverLease(conf, currentPath);
+      reader = null;
+    } catch (NullPointerException npe) {
+      // Workaround for race condition in HDFS-4380
+      // which throws a NPE if we open a file before any data node has the most recent block
+      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+      LOG.warn("Got NPE opening reader, will retry.");
+      reader = null;
+    }
+  }
+
+  // For HBASE-15019
+  private void recoverLease(final Configuration conf, final Path path) {
+    try {
+      final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
+      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
+        @Override
+        public boolean progress() {
+          LOG.debug("recover WAL lease: " + path);
+          return true;
+        }
+      });
+    } catch (IOException e) {
+      LOG.warn("unable to recover lease for WAL: " + path, e);
+    }
+  }
+
+  private void resetReader() throws IOException {
+    try {
+      reader.reset();
+      seek();
+    } catch (FileNotFoundException fnfe) {
+      // If the log was archived, continue reading from there
+      Path archivedLog = getArchivedLog(currentPath);
+      if (!currentPath.equals(archivedLog)) {
+        openReader(archivedLog);
+      } else {
+        throw fnfe;
+      }
+    } catch (NullPointerException npe) {
+      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
+    }
+  }
+
+  private void seek() throws IOException {
+    if (currentPosition != 0) {
+      reader.seek(currentPosition);
+    }
+  }
+
+  private long currentTrailerSize() {
+    long size = -1L;
+    if (reader instanceof ProtobufLogReader) {
+      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
+      size = pblr.trailerSize();
+    }
+    return size;
+  }
+
+  @InterfaceAudience.Private
+  public static class WALEntryStreamRuntimeException extends RuntimeException {
+    private static final long serialVersionUID = -6298201811259982568L;
+
+    public WALEntryStreamRuntimeException(Exception e) {
+      super(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
deleted file mode 100644
index 40db3eb..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-@Category({LargeTests.class})
-@RunWith(Parameterized.class)
-public class TestReplicationWALReaderManager {
-
-  private static HBaseTestingUtility TEST_UTIL;
-  private static Configuration conf;
-  private static FileSystem fs;
-  private static MiniDFSCluster cluster;
-  private static final TableName tableName = TableName.valueOf("tablename");
-  private static final byte [] family = Bytes.toBytes("column");
-  private static final byte [] qualifier = Bytes.toBytes("qualifier");
-  private static final HRegionInfo info = new HRegionInfo(tableName,
-      HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
-  private static final HTableDescriptor htd = new HTableDescriptor(tableName);
-
-  private WAL log;
-  private ReplicationWALReaderManager logManager;
-  private PathWatcher pathWatcher;
-  private int nbRows;
-  private int walEditKVs;
-  @Rule public TestName tn = new TestName();
-  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-
-  @Parameters
-  public static Collection<Object[]> parameters() {
-    // Try out different combinations of row count and KeyValue count
-    int[] NB_ROWS = { 1500, 60000 };
-    int[] NB_KVS = { 1, 100 };
-    // whether compression is used
-    Boolean[] BOOL_VALS = { false, true };
-    List<Object[]> parameters = new ArrayList<Object[]>();
-    for (int nbRows : NB_ROWS) {
-      for (int walEditKVs : NB_KVS) {
-        for (boolean b : BOOL_VALS) {
-          Object[] arr = new Object[3];
-          arr[0] = nbRows;
-          arr[1] = walEditKVs;
-          arr[2] = b;
-          parameters.add(arr);
-        }
-      }
-    }
-    return parameters;
-  }
-
-  public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression)
{
-    this.nbRows = nbRows;
-    this.walEditKVs = walEditKVs;
-    TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
-      enableCompression);
-    mvcc.advanceTo(1);
-  }
-  
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL = new HBaseTestingUtility();
-    conf = TEST_UTIL.getConfiguration();
-    TEST_UTIL.startMiniDFSCluster(3);
-
-    cluster = TEST_UTIL.getDFSCluster();
-    fs = cluster.getFileSystem();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    logManager = new ReplicationWALReaderManager(fs, conf);
-    List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
-    pathWatcher = new PathWatcher();
-    listeners.add(pathWatcher);
-    final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
-    log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    log.close();
-  }
-
-  @Test
-  public void test() throws Exception {
-    // Grab the path that was generated when the log rolled as part of its creation
-    Path path = pathWatcher.currentPath;
-
-    assertEquals(0, logManager.getPosition());
-
-    appendToLog();
-
-    // There's one edit in the log, read it. Reading past it needs to return nulls
-    assertNotNull(logManager.openReader(path));
-    logManager.seek();
-    WAL.Entry entry = logManager.readNextAndSetPosition();
-    assertNotNull(entry);
-    entry = logManager.readNextAndSetPosition();
-    assertNull(entry);
-    logManager.closeReader();
-    long oldPos = logManager.getPosition();
-
-    appendToLog();
-
-    // Read the newly added entry, make sure we made progress
-    assertNotNull(logManager.openReader(path));
-    logManager.seek();
-    entry = logManager.readNextAndSetPosition();
-    assertNotEquals(oldPos, logManager.getPosition());
-    assertNotNull(entry);
-    logManager.closeReader();
-    oldPos = logManager.getPosition();
-
-    log.rollWriter();
-
-    // We rolled but we still should see the end of the first log and not get data
-    assertNotNull(logManager.openReader(path));
-    logManager.seek();
-    entry = logManager.readNextAndSetPosition();
-    assertEquals(oldPos, logManager.getPosition());
-    assertNull(entry);
-    logManager.finishCurrentFile();
-
-    path = pathWatcher.currentPath;
-
-    for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
-    log.rollWriter();
-    logManager.openReader(path);
-    logManager.seek();
-    for (int i = 0; i < nbRows; i++) {
-      WAL.Entry e = logManager.readNextAndSetPosition();
-      if (e == null) {
-        fail("Should have enough entries");
-      }
-    }
-  }
-
-  private void appendToLog() throws IOException {
-    appendToLogPlus(1);
-  }
-
-  private void appendToLogPlus(int count) throws IOException {
-    final long txid = log.append(htd, info,
-        new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
-        getWALEdits(count), true);
-    log.sync(txid);
-  }
-
-  private WALEdit getWALEdits(int count) {
-    WALEdit edit = new WALEdit();
-    for (int i = 0; i < count; i++) {
-      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
-        System.currentTimeMillis(), qualifier));
-    }
-    return edit;
-  }
-
-  class PathWatcher extends WALActionsListener.Base {
-
-    Path currentPath;
-
-    @Override
-    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-      currentPath = newPath;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
new file mode 100644
index 0000000..005e2a1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestWALEntryStream {
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static MiniDFSCluster cluster;
+  private static final TableName tableName = TableName.valueOf("tablename");
+  private static final byte[] family = Bytes.toBytes("column");
+  private static final byte[] qualifier = Bytes.toBytes("qualifier");
+  private static final HRegionInfo info =
+      new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
+  private static final HTableDescriptor htd = new HTableDescriptor(tableName);
+  private static NavigableMap<byte[], Integer> scopes;
+
+  private WAL log;
+  PriorityBlockingQueue<Path> walQueue;
+  private PathWatcher pathWatcher;
+
+  @Rule
+  public TestName tn = new TestName();
+  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    cluster = TEST_UTIL.getDFSCluster();
+    fs = cluster.getFileSystem();
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    walQueue = new PriorityBlockingQueue<>();
+    List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+    pathWatcher = new PathWatcher();
+    listeners.add(pathWatcher);
+    final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
+    log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    log.close();
+  }
+
+  // Try out different combinations of row count and KeyValue count
+  @Test
+  public void testDifferentCounts() throws Exception {
+    int[] NB_ROWS = { 1500, 60000 };
+    int[] NB_KVS = { 1, 100 };
+    // whether compression is used
+    Boolean[] BOOL_VALS = { false, true };
+    // long lastPosition = 0;
+    for (int nbRows : NB_ROWS) {
+      for (int walEditKVs : NB_KVS) {
+        for (boolean isCompressionEnabled : BOOL_VALS) {
+          TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
+            isCompressionEnabled);
+          mvcc.advanceTo(1);
+
+          for (int i = 0; i < nbRows; i++) {
+            appendToLogPlus(walEditKVs);
+          }
+
+          log.rollWriter();
+
+          try (WALEntryStream entryStream =
+              new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+            int i = 0;
+            for (WAL.Entry e : entryStream) {
+              assertNotNull(e);
+              i++;
+            }
+            assertEquals(nbRows, i);
+
+            // should've read all entries
+            assertFalse(entryStream.hasNext());
+          }
+          // reset everything for next loop
+          log.close();
+          setUp();
+        }
+      }
+    }
+  }
+
+  /**
+   * Tests basic reading of log appends
+   */
+  @Test
+  public void testAppendsWithRolls() throws Exception {
+    appendToLog();
+
+    long oldPos;
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      // There's one edit in the log, read it. Reading past it needs to throw exception
+      assertTrue(entryStream.hasNext());
+      WAL.Entry entry = entryStream.next();
+      assertNotNull(entry);
+      assertFalse(entryStream.hasNext());
+      try {
+        entry = entryStream.next();
+        fail();
+      } catch (NoSuchElementException e) {
+        // expected
+      }
+      oldPos = entryStream.getPosition();
+    }
+
+    appendToLog();
+
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+      // Read the newly added entry, make sure we made progress
+      WAL.Entry entry = entryStream.next();
+      assertNotEquals(oldPos, entryStream.getPosition());
+      assertNotNull(entry);
+      oldPos = entryStream.getPosition();
+    }
+
+    // We rolled but we still should see the end of the first log and get that item
+    appendToLog();
+    log.rollWriter();
+    appendToLog();
+
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+      WAL.Entry entry = entryStream.next();
+      assertNotEquals(oldPos, entryStream.getPosition());
+      assertNotNull(entry);
+
+      // next item should come from the new log
+      entry = entryStream.next();
+      assertNotEquals(oldPos, entryStream.getPosition());
+      assertNotNull(entry);
+
+      // no more entries to read
+      assertFalse(entryStream.hasNext());
+      oldPos = entryStream.getPosition();
+    }
+  }
+
+  /**
+   * Tests that if after a stream is opened, more entries come in and then the log is rolled,
we
+   * don't mistakenly dequeue the current log thinking we're done with it
+   */
+  @Test
+  public void testLogrollWhileStreaming() throws Exception {
+    appendToLog("1");
+    appendToLog("2");// 2
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      assertEquals("1", getRow(entryStream.next()));
+
+      appendToLog("3"); // 3 - comes in after reader opened
+      log.rollWriter(); // log roll happening while we're reading
+      appendToLog("4"); // 4 - this append is in the rolled log
+
+      assertEquals("2", getRow(entryStream.next()));
+      assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's
still an
+                                        // entry in first log
+      assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would
be 4
+                                                     // and 3 would be skipped
+      assertEquals("4", getRow(entryStream.next())); // 4
+      assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+  /**
+   * Tests that if writes come in while we have a stream open, we shouldn't miss them
+   */
+  @Test
+  public void testNewEntriesWhileStreaming() throws Exception {
+    appendToLog("1");
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      entryStream.next(); // we've hit the end of the stream at this point
+
+      // some new entries come in while we're streaming
+      appendToLog("2");
+      appendToLog("3");
+
+      // don't see them
+      assertFalse(entryStream.hasNext());
+
+      // But we do if we reset
+      entryStream.reset();
+      assertEquals("2", getRow(entryStream.next()));
+      assertEquals("3", getRow(entryStream.next()));
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+  @Test
+  public void testResumeStreamingFromPosition() throws Exception {
+    long lastPosition = 0;
+    appendToLog("1");
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      entryStream.next(); // we've hit the end of the stream at this point
+      appendToLog("2");
+      appendToLog("3");
+      lastPosition = entryStream.getPosition();
+    }
+    // next stream should picks up where we left off
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+      assertEquals("2", getRow(entryStream.next()));
+      assertEquals("3", getRow(entryStream.next()));
+      assertFalse(entryStream.hasNext()); // done
+      assertEquals(1, walQueue.size());
+    }
+  }
+
+  /**
+   * Tests that if we stop before hitting the end of a stream, we can continue where we left
off
+   * using the last position
+   */
+  @Test
+  public void testPosition() throws Exception {
+    long lastPosition = 0;
+    appendEntriesToLog(3);
+    // read only one element
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+      entryStream.next();
+      lastPosition = entryStream.getPosition();
+    }
+    // there should still be two more entries from where we left off
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+      assertNotNull(entryStream.next());
+      assertNotNull(entryStream.next());
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+
+  @Test
+  public void testEmptyStream() throws Exception {
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+  @Test
+  public void testReplicationSourceWALReaderThread() throws Exception {
+    appendEntriesToLog(3);
+    // get ending position
+    long position;
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      entryStream.next();
+      entryStream.next();
+      entryStream.next();
+      position = entryStream.getPosition();
+    }
+
+    // start up a batcher
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager,
getQueueInfo(),walQueue, 0,
+        fs, conf, getDummyFilter(), new MetricsSource("1"));
+    Path walPath = walQueue.peek();
+    batcher.start();
+    WALEntryBatch entryBatch = batcher.take();
+
+    // should've batched up our entries
+    assertNotNull(entryBatch);
+    assertEquals(3, entryBatch.getWalEntries().size());
+    assertEquals(position, entryBatch.getLastWalPosition());
+    assertEquals(walPath, entryBatch.getLastWalPath());
+    assertEquals(3, entryBatch.getNbRowKeys());
+
+    appendToLog("foo");
+    entryBatch = batcher.take();
+    assertEquals(1, entryBatch.getNbEntries());
+    assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
+  }
+
+  private String getRow(WAL.Entry entry) {
+    Cell cell = entry.getEdit().getCells().get(0);
+    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+  }
+
+  private void appendToLog(String key) throws IOException {
+    final long txid = log.append(htd, info,
+      new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
+      getWALEdit(key), true);
+    log.sync(txid);
+  }
+
+  private void appendEntriesToLog(int count) throws IOException {
+    for (int i = 0; i < count; i++) {
+      appendToLog();
+    }
+  }
+
+  private void appendToLog() throws IOException {
+    appendToLogPlus(1);
+  }
+
+  private void appendToLogPlus(int count) throws IOException {
+    final long txid = log.append(htd, info,
+      new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
+      getWALEdits(count), true);
+    log.sync(txid);
+  }
+
+  private WALEdit getWALEdits(int count) {
+    WALEdit edit = new WALEdit();
+    for (int i = 0; i < count; i++) {
+      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
+          System.currentTimeMillis(), qualifier));
+    }
+    return edit;
+  }
+
+  private WALEdit getWALEdit(String row) {
+    WALEdit edit = new WALEdit();
+    edit.add(
+      new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
+    return edit;
+  }
+
+  private WALEntryFilter getDummyFilter() {
+    return new WALEntryFilter() {
+
+      @Override
+      public Entry filter(Entry entry) {
+        return entry;
+      }
+    };
+  }
+
+  private ReplicationQueueInfo getQueueInfo() {
+    return new ReplicationQueueInfo("1");
+  }
+
+  class PathWatcher extends WALActionsListener.Base {
+
+    Path currentPath;
+
+    @Override
+    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+      walQueue.add(newPath);
+      currentPath = newPath;
+    }
+  }
+
+}


Mime
View raw message