Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 80333200C89 for ; Sat, 3 Jun 2017 18:49:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7E87F160BD1; Sat, 3 Jun 2017 16:49:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CD716160BB5 for ; Sat, 3 Jun 2017 18:49:08 +0200 (CEST) Received: (qmail 4133 invoked by uid 500); 3 Jun 2017 16:49:08 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 4119 invoked by uid 99); 3 Jun 2017 16:49:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Jun 2017 16:49:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C2E28E00C4; Sat, 3 Jun 2017 16:49:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Date: Sat, 03 Jun 2017 16:49:07 -0000 Message-Id: <037840599f6f44898d92453ba1c3f5a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hbase git commit: HBASE-15995 Separate replication WAL reading from shipping archived-at: Sat, 03 Jun 2017 16:49:10 -0000 Repository: hbase Updated Branches: refs/heads/branch-1 b66a478e7 -> 3cf443326 http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java new file mode 100644 index 0000000..c4d552c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -0,0 +1,411 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * dequeues it and starts reading from the next. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class WALEntryStream implements Iterator, Closeable, Iterable { + private static final Log LOG = LogFactory.getLog(WALEntryStream.class); + + private Reader reader; + private Path currentPath; + // cache of next entry for hasNext() + private Entry currentEntry; + // position after reading current entry + private long currentPosition = 0; + private PriorityBlockingQueue logQueue; + private FileSystem fs; + private Configuration conf; + private MetricsSource metrics; + + /** + * Create an entry stream over the given queue + * @param logQueue the queue of WAL paths + * @param fs {@link FileSystem} to use to create {@link Reader} for this stream + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param metrics replication metrics + * @throws IOException + */ + public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, + MetricsSource metrics) + throws IOException { + this(logQueue, fs, conf, 0, metrics); + } + + /** + * Create an entry stream over the given queue at the given start position + * @param logQueue the queue of WAL paths + * @param fs {@link FileSystem} to use to create {@link Reader} for this stream + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param startPosition the position in the first WAL to start reading at + * @param metrics replication metrics + * @throws IOException + */ + public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, + long startPosition, MetricsSource metrics) throws IOException { + this.logQueue = logQueue; + this.fs = fs; + this.conf = conf; + this.currentPosition = startPosition; + this.metrics = metrics; + } + + /** + * @return true if there is another WAL {@link Entry} + * @throws WALEntryStreamRuntimeException if there was an Exception while reading + */ + @Override + public boolean hasNext() { + if (currentEntry == null) { + try { + tryAdvanceEntry(); + } catch (Exception e) { + throw new WALEntryStreamRuntimeException(e); + } + } + return currentEntry != null; + } + + /** + * @return the next WAL entry in this stream + * @throws WALEntryStreamRuntimeException if there was an IOException + * @throws NoSuchElementException if no more entries in the stream. + */ + @Override + public Entry next() { + if (!hasNext()) throw new NoSuchElementException(); + Entry save = currentEntry; + currentEntry = null; // gets reloaded by hasNext() + return save; + } + + /** + * Not supported. + */ + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + closeReader(); + } + + /** + * @return the iterator over WAL entries in the queue. + */ + @Override + public Iterator iterator() { + return this; + } + + /** + * @return the position of the last Entry returned by next() + */ + public long getPosition() { + return currentPosition; + } + + /** + * @return the {@link Path} of the current WAL + */ + public Path getCurrentPath() { + return currentPath; + } + + private String getCurrentPathStat() { + StringBuilder sb = new StringBuilder(); + if (currentPath != null) { + sb.append("currently replicating from: ").append(currentPath).append(" at position: ") + .append(currentPosition).append("\n"); + } else { + sb.append("no replication ongoing, waiting for new log"); + } + return sb.toString(); + } + + /** + * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned + * false) + * @throws IOException + */ + public void reset() throws IOException { + if (reader != null && currentPath != null) { + resetReader(); + } + } + + private void setPosition(long position) { + currentPosition = position; + } + + private void setCurrentPath(Path path) { + this.currentPath = path; + } + + private void tryAdvanceEntry() throws IOException { + if (checkReader()) { + readNextEntryAndSetPosition(); + if (currentEntry == null) { // no more entries in this log file - see if log was rolled + if (logQueue.size() > 1) { // log was rolled + // Before dequeueing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, + // and a new log was enqueued while we were reading. See HBASE-6758 + resetReader(); + readNextEntryAndSetPosition(); + if (currentEntry == null) { + if (checkAllBytesParsed()) { // now we're certain we're done with this log file + dequeueCurrentLog(); + if (openNextLog()) { + readNextEntryAndSetPosition(); + } + } + } + } // no other logs, we've simply hit the end of the current open log. Do nothing + } + } + // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) + } + + // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file + private boolean checkAllBytesParsed() throws IOException { + // -1 means the wal wasn't closed cleanly. + final long trailerSize = currentTrailerSize(); + FileStatus stat = null; + try { + stat = fs.getFileStatus(this.currentPath); + } catch (IOException exception) { + LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat()); + metrics.incrUnknownFileLengthForClosedWAL(); + } + if (stat != null) { + if (trailerSize < 0) { + if (currentPosition < stat.getLen()) { + final long skippedBytes = stat.getLen() - currentPosition; + LOG.info("Reached the end of WAL file '" + currentPath + + "'. It was not closed cleanly, so we did not parse " + skippedBytes + + " bytes of data."); + metrics.incrUncleanlyClosedWALs(); + metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); + } + } else if (currentPosition + trailerSize < stat.getLen()) { + LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + + ", which is too far away from reported file length " + stat.getLen() + + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat()); + setPosition(0); + resetReader(); + metrics.incrRestartedWALReading(); + metrics.incrRepeatedFileBytes(currentPosition); + return false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + + (stat == null ? "N/A" : stat.getLen())); + } + metrics.incrCompletedWAL(); + return true; + } + + private void dequeueCurrentLog() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Reached the end of log " + currentPath); + } + closeReader(); + logQueue.remove(); + setPosition(0); + metrics.decrSizeOfLogQueue(); + } + + private void readNextEntryAndSetPosition() throws IOException { + Entry readEntry = reader.next(); + long readerPos = reader.getPosition(); + if (readEntry != null) { + metrics.incrLogEditsRead(); + metrics.incrLogReadInBytes(readerPos - currentPosition); + } + currentEntry = readEntry; // could be null + setPosition(readerPos); + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + // if we don't have a reader, open a reader on the next log + private boolean checkReader() throws IOException { + if (reader == null) { + return openNextLog(); + } + return true; + } + + // open a reader on the next log in queue + private boolean openNextLog() throws IOException { + Path nextPath = logQueue.peek(); + if (nextPath != null) { + openReader(nextPath); + if (reader != null) return true; + } + return false; + } + + private Path getArchivedLog(Path path) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } else { + LOG.error("Couldn't locate log: " + path); + return path; + } + } + + private void openReader(Path path) throws IOException { + try { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (reader == null || !getCurrentPath().equals(path)) { + closeReader(); + reader = WALFactory.createReader(fs, path, conf); + seek(); + setCurrentPath(path); + } else { + resetReader(); + } + } catch (FileNotFoundException fnfe) { + // If the log was archived, continue reading from there + Path archivedLog = getArchivedLog(path); + if (!path.equals(archivedLog)) { + openReader(archivedLog); + } else { + throw fnfe; + } + } catch (LeaseNotRecoveredException lnre) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + currentPath, lnre); + recoverLease(conf, currentPath); + reader = null; + } catch (NullPointerException npe) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + reader = null; + } + } + + // For HBASE-15019 + private void recoverLease(final Configuration conf, final Path path) { + try { + final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("recover WAL lease: " + path); + return true; + } + }); + } catch (IOException e) { + LOG.warn("unable to recover lease for WAL: " + path, e); + } + } + + private void resetReader() throws IOException { + try { + reader.reset(); + seek(); + } catch (FileNotFoundException fnfe) { + // If the log was archived, continue reading from there + Path archivedLog = getArchivedLog(currentPath); + if (!currentPath.equals(archivedLog)) { + openReader(archivedLog); + } else { + throw fnfe; + } + } catch (NullPointerException npe) { + throw new IOException("NPE resetting reader, likely HDFS-4380", npe); + } + } + + private void seek() throws IOException { + if (currentPosition != 0) { + reader.seek(currentPosition); + } + } + + private long currentTrailerSize() { + long size = -1L; + if (reader instanceof ProtobufLogReader) { + final ProtobufLogReader pblr = (ProtobufLogReader) reader; + size = pblr.trailerSize(); + } + return size; + } + + @InterfaceAudience.Private + public static class WALEntryStreamRuntimeException extends RuntimeException { + private static final long serialVersionUID = -6298201811259982568L; + + public WALEntryStreamRuntimeException(Exception e) { + super(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java deleted file mode 100644 index 40db3eb..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication.regionserver; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -@Category({LargeTests.class}) -@RunWith(Parameterized.class) -public class TestReplicationWALReaderManager { - - private static HBaseTestingUtility TEST_UTIL; - private static Configuration conf; - private static FileSystem fs; - private static MiniDFSCluster cluster; - private static final TableName tableName = TableName.valueOf("tablename"); - private static final byte [] family = Bytes.toBytes("column"); - private static final byte [] qualifier = Bytes.toBytes("qualifier"); - private static final HRegionInfo info = new HRegionInfo(tableName, - HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false); - private static final HTableDescriptor htd = new HTableDescriptor(tableName); - - private WAL log; - private ReplicationWALReaderManager logManager; - private PathWatcher pathWatcher; - private int nbRows; - private int walEditKVs; - @Rule public TestName tn = new TestName(); - private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - - @Parameters - public static Collection parameters() { - // Try out different combinations of row count and KeyValue count - int[] NB_ROWS = { 1500, 60000 }; - int[] NB_KVS = { 1, 100 }; - // whether compression is used - Boolean[] BOOL_VALS = { false, true }; - List parameters = new ArrayList(); - for (int nbRows : NB_ROWS) { - for (int walEditKVs : NB_KVS) { - for (boolean b : BOOL_VALS) { - Object[] arr = new Object[3]; - arr[0] = nbRows; - arr[1] = walEditKVs; - arr[2] = b; - parameters.add(arr); - } - } - } - return parameters; - } - - public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) { - this.nbRows = nbRows; - this.walEditKVs = walEditKVs; - TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, - enableCompression); - mvcc.advanceTo(1); - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL = new HBaseTestingUtility(); - conf = TEST_UTIL.getConfiguration(); - TEST_UTIL.startMiniDFSCluster(3); - - cluster = TEST_UTIL.getDFSCluster(); - fs = cluster.getFileSystem(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Before - public void setUp() throws Exception { - logManager = new ReplicationWALReaderManager(fs, conf); - List listeners = new ArrayList(); - pathWatcher = new PathWatcher(); - listeners.add(pathWatcher); - final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); - log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); - } - - @After - public void tearDown() throws Exception { - log.close(); - } - - @Test - public void test() throws Exception { - // Grab the path that was generated when the log rolled as part of its creation - Path path = pathWatcher.currentPath; - - assertEquals(0, logManager.getPosition()); - - appendToLog(); - - // There's one edit in the log, read it. Reading past it needs to return nulls - assertNotNull(logManager.openReader(path)); - logManager.seek(); - WAL.Entry entry = logManager.readNextAndSetPosition(); - assertNotNull(entry); - entry = logManager.readNextAndSetPosition(); - assertNull(entry); - logManager.closeReader(); - long oldPos = logManager.getPosition(); - - appendToLog(); - - // Read the newly added entry, make sure we made progress - assertNotNull(logManager.openReader(path)); - logManager.seek(); - entry = logManager.readNextAndSetPosition(); - assertNotEquals(oldPos, logManager.getPosition()); - assertNotNull(entry); - logManager.closeReader(); - oldPos = logManager.getPosition(); - - log.rollWriter(); - - // We rolled but we still should see the end of the first log and not get data - assertNotNull(logManager.openReader(path)); - logManager.seek(); - entry = logManager.readNextAndSetPosition(); - assertEquals(oldPos, logManager.getPosition()); - assertNull(entry); - logManager.finishCurrentFile(); - - path = pathWatcher.currentPath; - - for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); } - log.rollWriter(); - logManager.openReader(path); - logManager.seek(); - for (int i = 0; i < nbRows; i++) { - WAL.Entry e = logManager.readNextAndSetPosition(); - if (e == null) { - fail("Should have enough entries"); - } - } - } - - private void appendToLog() throws IOException { - appendToLogPlus(1); - } - - private void appendToLogPlus(int count) throws IOException { - final long txid = log.append(htd, info, - new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), - getWALEdits(count), true); - log.sync(txid); - } - - private WALEdit getWALEdits(int count) { - WALEdit edit = new WALEdit(); - for (int i = 0; i < count; i++) { - edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, - System.currentTimeMillis(), qualifier)); - } - return edit; - } - - class PathWatcher extends WALActionsListener.Base { - - Path currentPath; - - @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - currentPath = newPath; - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java new file mode 100644 index 0000000..005e2a1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -0,0 +1,440 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestWALEntryStream { + + private static HBaseTestingUtility TEST_UTIL; + private static Configuration conf; + private static FileSystem fs; + private static MiniDFSCluster cluster; + private static final TableName tableName = TableName.valueOf("tablename"); + private static final byte[] family = Bytes.toBytes("column"); + private static final byte[] qualifier = Bytes.toBytes("qualifier"); + private static final HRegionInfo info = + new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false); + private static final HTableDescriptor htd = new HTableDescriptor(tableName); + private static NavigableMap scopes; + + private WAL log; + PriorityBlockingQueue walQueue; + private PathWatcher pathWatcher; + + @Rule + public TestName tn = new TestName(); + private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniDFSCluster(3); + + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + for (byte[] fam : htd.getFamiliesKeys()) { + scopes.put(fam, 0); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + walQueue = new PriorityBlockingQueue<>(); + List listeners = new ArrayList(); + pathWatcher = new PathWatcher(); + listeners.add(pathWatcher); + final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); + log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); + } + + @After + public void tearDown() throws Exception { + log.close(); + } + + // Try out different combinations of row count and KeyValue count + @Test + public void testDifferentCounts() throws Exception { + int[] NB_ROWS = { 1500, 60000 }; + int[] NB_KVS = { 1, 100 }; + // whether compression is used + Boolean[] BOOL_VALS = { false, true }; + // long lastPosition = 0; + for (int nbRows : NB_ROWS) { + for (int walEditKVs : NB_KVS) { + for (boolean isCompressionEnabled : BOOL_VALS) { + TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, + isCompressionEnabled); + mvcc.advanceTo(1); + + for (int i = 0; i < nbRows; i++) { + appendToLogPlus(walEditKVs); + } + + log.rollWriter(); + + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + int i = 0; + for (WAL.Entry e : entryStream) { + assertNotNull(e); + i++; + } + assertEquals(nbRows, i); + + // should've read all entries + assertFalse(entryStream.hasNext()); + } + // reset everything for next loop + log.close(); + setUp(); + } + } + } + } + + /** + * Tests basic reading of log appends + */ + @Test + public void testAppendsWithRolls() throws Exception { + appendToLog(); + + long oldPos; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + // There's one edit in the log, read it. Reading past it needs to throw exception + assertTrue(entryStream.hasNext()); + WAL.Entry entry = entryStream.next(); + assertNotNull(entry); + assertFalse(entryStream.hasNext()); + try { + entry = entryStream.next(); + fail(); + } catch (NoSuchElementException e) { + // expected + } + oldPos = entryStream.getPosition(); + } + + appendToLog(); + + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + // Read the newly added entry, make sure we made progress + WAL.Entry entry = entryStream.next(); + assertNotEquals(oldPos, entryStream.getPosition()); + assertNotNull(entry); + oldPos = entryStream.getPosition(); + } + + // We rolled but we still should see the end of the first log and get that item + appendToLog(); + log.rollWriter(); + appendToLog(); + + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) { + WAL.Entry entry = entryStream.next(); + assertNotEquals(oldPos, entryStream.getPosition()); + assertNotNull(entry); + + // next item should come from the new log + entry = entryStream.next(); + assertNotEquals(oldPos, entryStream.getPosition()); + assertNotNull(entry); + + // no more entries to read + assertFalse(entryStream.hasNext()); + oldPos = entryStream.getPosition(); + } + } + + /** + * Tests that if after a stream is opened, more entries come in and then the log is rolled, we + * don't mistakenly dequeue the current log thinking we're done with it + */ + @Test + public void testLogrollWhileStreaming() throws Exception { + appendToLog("1"); + appendToLog("2");// 2 + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + assertEquals("1", getRow(entryStream.next())); + + appendToLog("3"); // 3 - comes in after reader opened + log.rollWriter(); // log roll happening while we're reading + appendToLog("4"); // 4 - this append is in the rolled log + + assertEquals("2", getRow(entryStream.next())); + assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an + // entry in first log + assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 + // and 3 would be skipped + assertEquals("4", getRow(entryStream.next())); // 4 + assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly + assertFalse(entryStream.hasNext()); + } + } + + /** + * Tests that if writes come in while we have a stream open, we shouldn't miss them + */ + @Test + public void testNewEntriesWhileStreaming() throws Exception { + appendToLog("1"); + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + entryStream.next(); // we've hit the end of the stream at this point + + // some new entries come in while we're streaming + appendToLog("2"); + appendToLog("3"); + + // don't see them + assertFalse(entryStream.hasNext()); + + // But we do if we reset + entryStream.reset(); + assertEquals("2", getRow(entryStream.next())); + assertEquals("3", getRow(entryStream.next())); + assertFalse(entryStream.hasNext()); + } + } + + @Test + public void testResumeStreamingFromPosition() throws Exception { + long lastPosition = 0; + appendToLog("1"); + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + entryStream.next(); // we've hit the end of the stream at this point + appendToLog("2"); + appendToLog("3"); + lastPosition = entryStream.getPosition(); + } + // next stream should picks up where we left off + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + assertEquals("2", getRow(entryStream.next())); + assertEquals("3", getRow(entryStream.next())); + assertFalse(entryStream.hasNext()); // done + assertEquals(1, walQueue.size()); + } + } + + /** + * Tests that if we stop before hitting the end of a stream, we can continue where we left off + * using the last position + */ + @Test + public void testPosition() throws Exception { + long lastPosition = 0; + appendEntriesToLog(3); + // read only one element + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + entryStream.next(); + lastPosition = entryStream.getPosition(); + } + // there should still be two more entries from where we left off + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) { + assertNotNull(entryStream.next()); + assertNotNull(entryStream.next()); + assertFalse(entryStream.hasNext()); + } + } + + + @Test + public void testEmptyStream() throws Exception { + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) { + assertFalse(entryStream.hasNext()); + } + } + + @Test + public void testReplicationSourceWALReaderThread() throws Exception { + appendEntriesToLog(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + + // start up a batcher + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, + fs, conf, getDummyFilter(), new MetricsSource("1")); + Path walPath = walQueue.peek(); + batcher.start(); + WALEntryBatch entryBatch = batcher.take(); + + // should've batched up our entries + assertNotNull(entryBatch); + assertEquals(3, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(3, entryBatch.getNbRowKeys()); + + appendToLog("foo"); + entryBatch = batcher.take(); + assertEquals(1, entryBatch.getNbEntries()); + assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo"); + } + + private String getRow(WAL.Entry entry) { + Cell cell = entry.getEdit().getCells().get(0); + return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + } + + private void appendToLog(String key) throws IOException { + final long txid = log.append(htd, info, + new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), + getWALEdit(key), true); + log.sync(txid); + } + + private void appendEntriesToLog(int count) throws IOException { + for (int i = 0; i < count; i++) { + appendToLog(); + } + } + + private void appendToLog() throws IOException { + appendToLogPlus(1); + } + + private void appendToLogPlus(int count) throws IOException { + final long txid = log.append(htd, info, + new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), + getWALEdits(count), true); + log.sync(txid); + } + + private WALEdit getWALEdits(int count) { + WALEdit edit = new WALEdit(); + for (int i = 0; i < count; i++) { + edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, + System.currentTimeMillis(), qualifier)); + } + return edit; + } + + private WALEdit getWALEdit(String row) { + WALEdit edit = new WALEdit(); + edit.add( + new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier)); + return edit; + } + + private WALEntryFilter getDummyFilter() { + return new WALEntryFilter() { + + @Override + public Entry filter(Entry entry) { + return entry; + } + }; + } + + private ReplicationQueueInfo getQueueInfo() { + return new ReplicationQueueInfo("1"); + } + + class PathWatcher extends WALActionsListener.Base { + + Path currentPath; + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + walQueue.add(newPath); + currentPath = newPath; + } + } + +}