hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [04/50] [abbrv] hbase git commit: HBASE-19792 TestReplicationSmallTests.testDisableEnable fails
Date Thu, 18 Jan 2018 05:29:00 GMT
HBASE-19792 TestReplicationSmallTests.testDisableEnable fails


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48bb1901
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48bb1901
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48bb1901

Branch: refs/heads/HBASE-19397-branch-2
Commit: 48bb19012c33ab724ed4f1e2a5a7b98f9ded841b
Parents: 20afaca
Author: zhangduo <zhangduo@apache.org>
Authored: Wed Jan 17 14:23:05 2018 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Wed Jan 17 14:23:17 2018 +0800

----------------------------------------------------------------------
 .../replication/TestReplicationSmallTests.java  | 1079 ------------------
 .../replication/TestVerifyReplication.java      |  493 ++++++++
 .../hbase/replication/TestReplicationBase.java  |  163 ++-
 .../TestReplicationEmptyWALRecovery.java        |  117 ++
 .../replication/TestReplicationSmallTests.java  |  436 +++++++
 5 files changed, 1190 insertions(+), 1098 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/48bb1901/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
deleted file mode 100644
index 3877f71..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ /dev/null
@@ -1,1079 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.client.replication.TableCFs;
-import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
-@Category({ReplicationTests.class, LargeTests.class})
-public class TestReplicationSmallTests extends TestReplicationBase {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class);
-  private static final String PEER_ID = "2";
-
-  @Rule
-  public TestName name = new TestName();
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @Before
-  public void setUp() throws Exception {
-    // Starting and stopping replication can make us miss new logs,
-    // rolling like this makes sure the most recent one gets added to the queue
-    for ( JVMClusterUtil.RegionServerThread r :
-        utility1.getHBaseCluster().getRegionServerThreads()) {
-      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
-    }
-    int rowCount = utility1.countRows(tableName);
-    utility1.deleteTableData(tableName);
-    // truncating the table will send one Delete per row to the slave cluster
-    // in an async fashion, which is why we cannot just call deleteTableData on
-    // utility2 since late writes could make it to the slave in some way.
-    // Instead, we truncate the first table and wait for all the Deletes to
-    // make it to the slave.
-    Scan scan = new Scan();
-    int lastCount = 0;
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for truncate");
-      }
-      ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(rowCount);
-      scanner.close();
-      if (res.length != 0) {
-        if (res.length < lastCount) {
-          i--; // Don't increment timeout if we make progress
-        }
-        lastCount = res.length;
-        LOG.info("Still got " + res.length + " rows");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Verify that version and column delete marker types are replicated
-   * correctly.
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testDeleteTypes() throws Exception {
-    LOG.info("testDeleteTypes");
-    final byte[] v1 = Bytes.toBytes("v1");
-    final byte[] v2 = Bytes.toBytes("v2");
-    final byte[] v3 = Bytes.toBytes("v3");
-    htable1 = utility1.getConnection().getTable(tableName);
-
-    long t = EnvironmentEdgeManager.currentTime();
-    // create three versions for "row"
-    Put put = new Put(row);
-    put.addColumn(famName, row, t, v1);
-    htable1.put(put);
-
-    put = new Put(row);
-    put.addColumn(famName, row, t + 1, v2);
-    htable1.put(put);
-
-    put = new Put(row);
-    put.addColumn(famName, row, t + 2, v3);
-    htable1.put(put);
-
-    Get get = new Get(row);
-    get.readAllVersions();
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
-      }
-      Result res = htable2.get(get);
-      if (res.size() < 3) {
-        LOG.info("Rows not available");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
-        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
-        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
-        break;
-      }
-    }
-    // place a version delete marker (delete last version)
-    Delete d = new Delete(row);
-    d.addColumn(famName, row, t);
-    htable1.delete(d);
-
-    get = new Get(row);
-    get.readAllVersions();
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
-      }
-      Result res = htable2.get(get);
-      if (res.size() > 2) {
-        LOG.info("Version not deleted");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
-        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
-        break;
-      }
-    }
-
-    // place a column delete marker
-    d = new Delete(row);
-    d.addColumns(famName, row, t+2);
-    htable1.delete(d);
-
-    // now *both* of the remaining version should be deleted
-    // at the replica
-    get = new Get(row);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for del replication");
-      }
-      Result res = htable2.get(get);
-      if (res.size() >= 1) {
-        LOG.info("Rows not deleted");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Add a row, check it's replicated, delete it, check's gone
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testSimplePutDelete() throws Exception {
-    LOG.info("testSimplePutDelete");
-    Put put = new Put(row);
-    put.addColumn(famName, row, row);
-
-    htable1 = utility1.getConnection().getTable(tableName);
-    htable1.put(put);
-
-    Get get = new Get(row);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
-      }
-      Result res = htable2.get(get);
-      if (res.isEmpty()) {
-        LOG.info("Row not available");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        assertArrayEquals(res.value(), row);
-        break;
-      }
-    }
-
-    Delete del = new Delete(row);
-    htable1.delete(del);
-
-    get = new Get(row);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for del replication");
-      }
-      Result res = htable2.get(get);
-      if (res.size() >= 1) {
-        LOG.info("Row not deleted");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Try a small batch upload using the write buffer, check it's replicated
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testSmallBatch() throws Exception {
-    LOG.info("testSmallBatch");
-    // normal Batch tests
-    loadData("", row);
-
-    Scan scan = new Scan();
-
-    ResultScanner scanner1 = htable1.getScanner(scan);
-    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
-    scanner1.close();
-    assertEquals(NB_ROWS_IN_BATCH, res1.length);
-
-    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
-  }
-
-  private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException {
-    Scan scan;
-    for (int i = 0; i < retries; i++) {
-      scan = new Scan();
-      if (i== retries -1) {
-        fail("Waited too much time for normal batch replication");
-      }
-      ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(expectedRows);
-      scanner.close();
-      if (res.length != expectedRows) {
-        LOG.info("Only got " + res.length + " rows");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        break;
-      }
-    }
-  }
-
-  private void loadData(String prefix, byte[] row) throws IOException {
-    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
-      put.addColumn(famName, row, row);
-      puts.add(put);
-    }
-    htable1.put(puts);
-  }
-
-  /**
-   * Test disable/enable replication, trying to insert, make sure nothing's
-   * replicated, enable it, the insert should be replicated
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 300000)
-  public void testDisableEnable() throws Exception {
-
-    // Test disabling replication
-    hbaseAdmin.disableReplicationPeer(PEER_ID);
-
-    byte[] rowkey = Bytes.toBytes("disable enable");
-    Put put = new Put(rowkey);
-    put.addColumn(famName, row, row);
-    htable1.put(put);
-
-    Get get = new Get(rowkey);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      Result res = htable2.get(get);
-      if (res.size() >= 1) {
-        fail("Replication wasn't disabled");
-      } else {
-        LOG.info("Row not replicated, let's wait a bit more...");
-        Thread.sleep(SLEEP_TIME);
-      }
-    }
-
-    // Test enable replication
-    hbaseAdmin.enableReplicationPeer(PEER_ID);
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-      Result res = htable2.get(get);
-      if (res.isEmpty()) {
-        LOG.info("Row not available");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        assertArrayEquals(res.value(), row);
-        return;
-      }
-    }
-    fail("Waited too much time for put replication");
-  }
-
-  /**
-   * Integration test for TestReplicationAdmin, removes and re-add a peer
-   * cluster
-   *
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testAddAndRemoveClusters() throws Exception {
-    LOG.info("testAddAndRemoveClusters");
-    hbaseAdmin.removeReplicationPeer(PEER_ID);
-    Thread.sleep(SLEEP_TIME);
-    byte[] rowKey = Bytes.toBytes("Won't be replicated");
-    Put put = new Put(rowKey);
-    put.addColumn(famName, row, row);
-    htable1.put(put);
-
-    Get get = new Get(rowKey);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i == NB_RETRIES-1) {
-        break;
-      }
-      Result res = htable2.get(get);
-      if (res.size() >= 1) {
-        fail("Not supposed to be replicated");
-      } else {
-        LOG.info("Row not replicated, let's wait a bit more...");
-        Thread.sleep(SLEEP_TIME);
-      }
-    }
-    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-    rpc.setClusterKey(utility2.getClusterKey());
-    hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
-    Thread.sleep(SLEEP_TIME);
-    rowKey = Bytes.toBytes("do rep");
-    put = new Put(rowKey);
-    put.addColumn(famName, row, row);
-    LOG.info("Adding new row");
-    htable1.put(put);
-
-    get = new Get(rowKey);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
-      }
-      Result res = htable2.get(get);
-      if (res.isEmpty()) {
-        LOG.info("Row not available");
-        Thread.sleep(SLEEP_TIME*i);
-      } else {
-        assertArrayEquals(res.value(), row);
-        break;
-      }
-    }
-  }
-
-
-  /**
-   * Do a more intense version testSmallBatch, one  that will trigger
-   * wal rolling and other non-trivial code paths
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testLoading() throws Exception {
-    LOG.info("Writing out rows to table1 in testLoading");
-    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
-    for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
-      Put put = new Put(Bytes.toBytes(i));
-      put.addColumn(famName, row, row);
-      puts.add(put);
-    }
-    // The puts will be iterated through and flushed only when the buffer
-    // size is reached.
-    htable1.put(puts);
-
-    Scan scan = new Scan();
-
-    ResultScanner scanner = htable1.getScanner(scan);
-    Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
-    scanner.close();
-
-    assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
-
-    LOG.info("Looking in table2 for replicated rows in testLoading");
-    long start = System.currentTimeMillis();
-    // Retry more than NB_RETRIES.  As it was, retries were done in 5 seconds and we'd fail
-    // sometimes.
-    final long retries = NB_RETRIES * 10;
-    for (int i = 0; i < retries; i++) {
-      scan = new Scan();
-      scanner = htable2.getScanner(scan);
-      res = scanner.next(NB_ROWS_IN_BIG_BATCH);
-      scanner.close();
-      if (res.length != NB_ROWS_IN_BIG_BATCH) {
-        if (i == retries - 1) {
-          int lastRow = -1;
-          for (Result result : res) {
-            int currentRow = Bytes.toInt(result.getRow());
-            for (int row = lastRow+1; row < currentRow; row++) {
-              LOG.error("Row missing: " + row);
-            }
-            lastRow = currentRow;
-          }
-          LOG.error("Last row: " + lastRow);
-          fail("Waited too much time for normal batch replication, " +
-            res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
-            (System.currentTimeMillis() - start) + "ms");
-        } else {
-          LOG.info("Only got " + res.length + " rows... retrying");
-          Thread.sleep(SLEEP_TIME);
-        }
-      } else {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Do a small loading into a table, make sure the data is really the same,
-   * then run the VerifyReplication job to check the results. Do a second
-   * comparison where all the cells are different.
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testVerifyRepJob() throws Exception {
-    // Populate the tables, at the same time it guarantees that the tables are
-    // identical since it does the check
-    testSmallBatch();
-
-    String[] args = new String[] {PEER_ID, tableName.getNameAsString()};
-    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
-
-    Scan scan = new Scan();
-    ResultScanner rs = htable2.getScanner(scan);
-    Put put = null;
-    for (Result result : rs) {
-      put = new Put(result.getRow());
-      Cell firstVal = result.rawCells()[0];
-      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
-          Bytes.toBytes("diff data"));
-      htable2.put(put);
-    }
-    Delete delete = new Delete(put.getRow());
-    htable2.delete(delete);
-    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
-  }
-
-  /**
-   * Load a row into a table, make sure the data is really the same,
-   * delete the row, make sure the delete marker is replicated,
-   * run verify replication with and without raw to check the results.
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testVerifyRepJobWithRawOptions() throws Exception {
-    LOG.info(name.getMethodName());
-
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    byte[] familyname = Bytes.toBytes("fam_raw");
-    byte[] row = Bytes.toBytes("row_raw");
-
-    Table lHtable1 = null;
-    Table lHtable2 = null;
-
-    try {
-      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
-          .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
-      TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(fam).build();
-      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
-        scopes.put(f.getName(), f.getScope());
-      }
-
-      Connection connection1 = ConnectionFactory.createConnection(conf1);
-      Connection connection2 = ConnectionFactory.createConnection(conf2);
-      try (Admin admin1 = connection1.getAdmin()) {
-        admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
-      }
-      try (Admin admin2 = connection2.getAdmin()) {
-        admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
-      }
-      utility1.waitUntilAllRegionsAssigned(tableName);
-      utility2.waitUntilAllRegionsAssigned(tableName);
-
-      lHtable1 = utility1.getConnection().getTable(tableName);
-      lHtable2 = utility2.getConnection().getTable(tableName);
-
-      Put put = new Put(row);
-      put.addColumn(familyname, row, row);
-      lHtable1.put(put);
-
-      Get get = new Get(row);
-      for (int i = 0; i < NB_RETRIES; i++) {
-        if (i==NB_RETRIES-1) {
-          fail("Waited too much time for put replication");
-        }
-        Result res = lHtable2.get(get);
-        if (res.isEmpty()) {
-          LOG.info("Row not available");
-          Thread.sleep(SLEEP_TIME);
-        } else {
-          assertArrayEquals(res.value(), row);
-          break;
-        }
-      }
-
-      Delete del = new Delete(row);
-      lHtable1.delete(del);
-
-      get = new Get(row);
-      for (int i = 0; i < NB_RETRIES; i++) {
-        if (i==NB_RETRIES-1) {
-          fail("Waited too much time for del replication");
-        }
-        Result res = lHtable2.get(get);
-        if (res.size() >= 1) {
-          LOG.info("Row not deleted");
-          Thread.sleep(SLEEP_TIME);
-        } else {
-          break;
-        }
-      }
-
-      // Checking verifyReplication for the default behavior.
-      String[] argsWithoutRaw = new String[] {PEER_ID, tableName.getNameAsString()};
-      runVerifyReplication(argsWithoutRaw, 0, 0);
-
-      // Checking verifyReplication with raw
-      String[] argsWithRawAsTrue = new String[] {"--raw", PEER_ID, tableName.getNameAsString()};
-      runVerifyReplication(argsWithRawAsTrue, 1, 0);
-    } finally {
-      if (lHtable1 != null) {
-        lHtable1.close();
-      }
-      if (lHtable2 != null) {
-        lHtable2.close();
-      }
-    }
-  }
-
-  private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(expectedGoodRows, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(expectedBadRows, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-  }
-
-  @Test(timeout=300000)
-  // VerifyReplication should honor versions option
-  public void testHBase14905() throws Exception {
-    // normal Batch tests
-    byte[] qualifierName = Bytes.toBytes("f1");
-    Put put = new Put(Bytes.toBytes("r1"));
-    put.addColumn(famName, qualifierName, Bytes.toBytes("v1002"));
-    htable1.put(put);
-    put.addColumn(famName, qualifierName, Bytes.toBytes("v1001"));
-    htable1.put(put);
-    put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
-    htable1.put(put);
-
-    Scan scan = new Scan();
-    scan.readVersions(100);
-    ResultScanner scanner1 = htable1.getScanner(scan);
-    Result[] res1 = scanner1.next(1);
-    scanner1.close();
-
-    assertEquals(1, res1.length);
-    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-      scan = new Scan();
-      scan.readVersions(100);
-      scanner1 = htable2.getScanner(scan);
-      res1 = scanner1.next(1);
-      scanner1.close();
-      if (res1.length != 1) {
-        LOG.info("Only got " + res1.length + " rows");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
-        if (cellNumber != 3) {
-          LOG.info("Only got " + cellNumber + " cells");
-          Thread.sleep(SLEEP_TIME);
-        } else {
-          break;
-        }
-      }
-      if (i == NB_RETRIES-1) {
-        fail("Waited too much time for normal batch replication");
-      }
-    }
-
-    put.addColumn(famName, qualifierName, Bytes.toBytes("v1111"));
-    htable2.put(put);
-    put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
-    htable2.put(put);
-
-    scan = new Scan();
-    scan.readVersions(100);
-    scanner1 = htable2.getScanner(scan);
-    res1 = scanner1.next(NB_ROWS_IN_BATCH);
-    scanner1.close();
-
-    assertEquals(1, res1.length);
-    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
-
-    String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
-    runVerifyReplication(args, 0, 1);
-  }
-
-  @Test(timeout=300000)
-  // VerifyReplication should honor versions option
-  public void testVersionMismatchHBase14905() throws Exception {
-    // normal Batch tests
-    byte[] qualifierName = Bytes.toBytes("f1");
-    Put put = new Put(Bytes.toBytes("r1"));
-    long ts = System.currentTimeMillis();
-    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
-    htable1.put(put);
-    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
-    htable1.put(put);
-    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
-    htable1.put(put);
-
-    Scan scan = new Scan();
-    scan.readVersions(100);
-    ResultScanner scanner1 = htable1.getScanner(scan);
-    Result[] res1 = scanner1.next(1);
-    scanner1.close();
-
-    assertEquals(1, res1.length);
-    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-      scan = new Scan();
-      scan.readVersions(100);
-      scanner1 = htable2.getScanner(scan);
-      res1 = scanner1.next(1);
-      scanner1.close();
-      if (res1.length != 1) {
-        LOG.info("Only got " + res1.length + " rows");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
-        if (cellNumber != 3) {
-          LOG.info("Only got " + cellNumber + " cells");
-          Thread.sleep(SLEEP_TIME);
-        } else {
-          break;
-        }
-      }
-      if (i == NB_RETRIES-1) {
-        fail("Waited too much time for normal batch replication");
-      }
-    }
-
-    try {
-      // Disabling replication and modifying the particular version of the cell to validate the feature.
-      hbaseAdmin.disableReplicationPeer(PEER_ID);
-      Put put2 = new Put(Bytes.toBytes("r1"));
-      put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
-      htable2.put(put2);
-
-      scan = new Scan();
-      scan.readVersions(100);
-      scanner1 = htable2.getScanner(scan);
-      res1 = scanner1.next(NB_ROWS_IN_BATCH);
-      scanner1.close();
-      assertEquals(1, res1.length);
-      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
-
-      String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
-      runVerifyReplication(args, 0, 1);
-      }
-    finally {
-      hbaseAdmin.enableReplicationPeer(PEER_ID);
-    }
-  }
-
-  /**
-   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
-   * the compaction WALEdit
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testCompactionWALEdits() throws Exception {
-    WALProtos.CompactionDescriptor compactionDescriptor =
-        WALProtos.CompactionDescriptor.getDefaultInstance();
-    RegionInfo hri = RegionInfoBuilder.newBuilder(htable1.getName())
-        .setStartKey(HConstants.EMPTY_START_ROW)
-        .setEndKey(HConstants.EMPTY_END_ROW)
-        .build();
-    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
-    Replication.scopeWALEdits(new WALKeyImpl(), edit,
-      htable1.getConfiguration(), null);
-  }
-
-  /**
-   * Test for HBASE-8663
-   * Create two new Tables with colfamilies enabled for replication then run
-   * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
-   * TestReplicationAdmin is a better place for this testing but it would need mocks.
-   * @throws Exception
-   */
-  @Test(timeout = 300000)
-  public void testVerifyListReplicatedTable() throws Exception {
-    LOG.info("testVerifyListReplicatedTable");
-
-    final String tName = "VerifyListReplicated_";
-    final String colFam = "cf1";
-    final int numOfTables = 3;
-
-    Admin hadmin = utility1.getAdmin();
-
-    // Create Tables
-    for (int i = 0; i < numOfTables; i++) {
-      hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + i))
-          .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam))
-              .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
-          .build());
-    }
-
-    // verify the result
-    List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
-    int[] match = new int[numOfTables]; // array of 3 with init value of zero
-
-    for (int i = 0; i < replicationColFams.size(); i++) {
-      TableCFs replicationEntry = replicationColFams.get(i);
-      String tn = replicationEntry.getTable().getNameAsString();
-      if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
-        int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
-        match[m]++; // should only increase once
-      }
-    }
-
-    // check the matching result
-    for (int i = 0; i < match.length; i++) {
-      assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
-    }
-
-    // drop tables
-    for (int i = 0; i < numOfTables; i++) {
-      TableName tableName = TableName.valueOf(tName + i);
-      hadmin.disableTable(tableName);
-      hadmin.deleteTable(tableName);
-    }
-
-    hadmin.close();
-  }
-
-  /**
-   *  Test for HBase-15259 WALEdits under replay will also be replicated
-   * */
-  @Test
-  public void testReplicationInReplay() throws Exception {
-    final TableName tableName = htable1.getName();
-
-    HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
-    RegionInfo hri = region.getRegionInfo();
-    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
-      scopes.put(fam, 1);
-    }
-    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-    int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
-    WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
-    final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
-    final byte[] qualifier = Bytes.toBytes("q");
-    final byte[] value = Bytes.toBytes("v");
-    WALEdit edit = new WALEdit(true);
-    long now = EnvironmentEdgeManager.currentTime();
-    edit.add(new KeyValue(rowName, famName, qualifier,
-      now, value));
-    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
-    wal.append(hri, walKey, edit, true);
-    wal.sync();
-
-    Get get = new Get(rowName);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i == NB_RETRIES-1) {
-        break;
-      }
-      Result res = htable2.get(get);
-      if (res.size() >= 1) {
-        fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
-      } else {
-        LOG.info("Row not replicated, let's wait a bit more...");
-        Thread.sleep(SLEEP_TIME);
-      }
-    }
-  }
-
-  @Test(timeout=300000)
-  public void testVerifyReplicationPrefixFiltering() throws Exception {
-    final byte[] prefixRow = Bytes.toBytes("prefixrow");
-    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
-    loadData("prefixrow", prefixRow);
-    loadData("secondrow", prefixRow2);
-    loadData("aaa", row);
-    loadData("zzz", row);
-    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
-    String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID,
-        tableName.getNameAsString()};
-    runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
-  }
-
-  @Test(timeout = 300000)
-  public void testVerifyReplicationSnapshotArguments() {
-    String[] args =
-        new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
-    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
-
-    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
-    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
-
-    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
-        tableName.getNameAsString() };
-    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
-
-    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
-    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
-
-    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
-    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
-
-    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
-        "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
-        tableName.getNameAsString() };
-    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
-
-    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
-        "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
-        "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
-
-    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
-  }
-
-  private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
-      throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
-    assertNotNull(subDirectories);
-    assertEquals(subDirectories.length, expectedCount);
-    for (int i = 0; i < expectedCount; i++) {
-      assertTrue(subDirectories[i].isDirectory());
-    }
-  }
-
-  @Test(timeout = 300000)
-  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
-    // Populate the tables, at the same time it guarantees that the tables are
-    // identical since it does the check
-    testSmallBatch();
-
-    // Take source and target tables snapshot
-    Path rootDir = FSUtils.getRootDir(conf1);
-    FileSystem fs = rootDir.getFileSystem(conf1);
-    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
-      new String(famName), sourceSnapshotName, rootDir, fs, true);
-
-    // Take target snapshot
-    Path peerRootDir = FSUtils.getRootDir(conf2);
-    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
-    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
-      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
-
-    String peerFSAddress = peerFs.getUri().toString();
-    String temPath1 = utility1.getRandomDir().toString();
-    String temPath2 = "/tmp2";
-
-    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
-        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
-        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
-        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
-
-    Job job = new VerifyReplication().createSubmittableJob(conf1, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(NB_ROWS_IN_BATCH,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(0,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-
-    checkRestoreTmpDir(conf1, temPath1, 1);
-    checkRestoreTmpDir(conf2, temPath2, 1);
-
-    Scan scan = new Scan();
-    ResultScanner rs = htable2.getScanner(scan);
-    Put put = null;
-    for (Result result : rs) {
-      put = new Put(result.getRow());
-      Cell firstVal = result.rawCells()[0];
-      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
-        Bytes.toBytes("diff data"));
-      htable2.put(put);
-    }
-    Delete delete = new Delete(put.getRow());
-    htable2.delete(delete);
-
-    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
-      new String(famName), sourceSnapshotName, rootDir, fs, true);
-
-    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
-    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
-      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
-
-    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
-        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
-        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
-        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
-
-    job = new VerifyReplication().createSubmittableJob(conf1, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(0,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(NB_ROWS_IN_BATCH,
-      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-
-    checkRestoreTmpDir(conf1, temPath1, 2);
-    checkRestoreTmpDir(conf2, temPath2, 2);
-  }
-
-  @Test
-  public void testEmptyWALRecovery() throws Exception {
-    final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
-
-    // for each RS, create an empty wal with same walGroupId
-    final List<Path> emptyWalPaths = new ArrayList<>();
-    long ts = System.currentTimeMillis();
-    for (int i = 0; i < numRs; i++) {
-      RegionInfo regionInfo =
-          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
-      WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
-      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
-      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
-      Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
-      utility1.getTestFileSystem().create(emptyWalPath).close();
-      emptyWalPaths.add(emptyWalPath);
-    }
-
-    // inject our empty wal into the replication queue, and then roll the original wal, which
-    // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
-    // determine if the file being replicated currently is still opened for write, so just inject a
-    // new wal to the replication queue does not mean the previous file is closed.
-    for (int i = 0; i < numRs; i++) {
-      HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
-      Replication replicationService = (Replication) hrs.getReplicationSourceService();
-      replicationService.preLogRoll(null, emptyWalPaths.get(i));
-      replicationService.postLogRoll(null, emptyWalPaths.get(i));
-      RegionInfo regionInfo =
-          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
-      WAL wal = hrs.getWAL(regionInfo);
-      wal.rollWriter(true);
-    }
-
-    // ReplicationSource should advance past the empty wal, or else the test will fail
-    waitForLogAdvance(numRs);
-
-    // we're now writing to the new wal
-    // if everything works, the source should've stopped reading from the empty wal, and start
-    // replicating from the new wal
-    testSimplePutDelete();
-  }
-
-  /**
-   * Waits until there is only one log(the current writing one) in the replication queue
-   * @param numRs number of regionservers
-   */
-  private void waitForLogAdvance(int numRs) throws Exception {
-    Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
-      @Override
-      public boolean evaluate() throws Exception {
-        for (int i = 0; i < numRs; i++) {
-          HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
-          RegionInfo regionInfo =
-              utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
-          WAL wal = hrs.getWAL(regionInfo);
-          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
-          Replication replicationService = (Replication) utility1.getHBaseCluster()
-              .getRegionServer(i).getReplicationSourceService();
-          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
-              .getSources()) {
-            ReplicationSource source = (ReplicationSource) rsi;
-            if (!currentFile.equals(source.getCurrentPath())) {
-              return false;
-            }
-          }
-        }
-        return true;
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/48bb1901/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
new file mode 100644
index 0000000..c0b2839
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestVerifyReplication extends TestReplicationBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class);
+
+  private static final String PEER_ID = "2";
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    cleanUp();
+  }
+
+  private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(expectedGoodRows,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(expectedBadRows,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+  }
+
+  /**
+   * Do a small loading into a table, make sure the data is really the same, then run the
+   * VerifyReplication job to check the results. Do a second comparison where all the cells are
+   * different.
+   */
+  @Test
+  public void testVerifyRepJob() throws Exception {
+    // Populate the tables, at the same time it guarantees that the tables are
+    // identical since it does the check
+    runSmallBatchTest();
+
+    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
+    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
+
+    Scan scan = new Scan();
+    ResultScanner rs = htable2.getScanner(scan);
+    Put put = null;
+    for (Result result : rs) {
+      put = new Put(result.getRow());
+      Cell firstVal = result.rawCells()[0];
+      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+        Bytes.toBytes("diff data"));
+      htable2.put(put);
+    }
+    Delete delete = new Delete(put.getRow());
+    htable2.delete(delete);
+    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+  }
+
+  /**
+   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
+   * delete marker is replicated, run verify replication with and without raw to check the results.
+   */
+  @Test
+  public void testVerifyRepJobWithRawOptions() throws Exception {
+    LOG.info(name.getMethodName());
+
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    byte[] familyname = Bytes.toBytes("fam_raw");
+    byte[] row = Bytes.toBytes("row_raw");
+
+    Table lHtable1 = null;
+    Table lHtable2 = null;
+
+    try {
+      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
+          .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
+      TableDescriptor table =
+          TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(fam).build();
+      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
+        scopes.put(f.getName(), f.getScope());
+      }
+
+      Connection connection1 = ConnectionFactory.createConnection(conf1);
+      Connection connection2 = ConnectionFactory.createConnection(conf2);
+      try (Admin admin1 = connection1.getAdmin()) {
+        admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+      }
+      try (Admin admin2 = connection2.getAdmin()) {
+        admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+      }
+      utility1.waitUntilAllRegionsAssigned(tableName);
+      utility2.waitUntilAllRegionsAssigned(tableName);
+
+      lHtable1 = utility1.getConnection().getTable(tableName);
+      lHtable2 = utility2.getConnection().getTable(tableName);
+
+      Put put = new Put(row);
+      put.addColumn(familyname, row, row);
+      lHtable1.put(put);
+
+      Get get = new Get(row);
+      for (int i = 0; i < NB_RETRIES; i++) {
+        if (i == NB_RETRIES - 1) {
+          fail("Waited too much time for put replication");
+        }
+        Result res = lHtable2.get(get);
+        if (res.isEmpty()) {
+          LOG.info("Row not available");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          assertArrayEquals(res.value(), row);
+          break;
+        }
+      }
+
+      Delete del = new Delete(row);
+      lHtable1.delete(del);
+
+      get = new Get(row);
+      for (int i = 0; i < NB_RETRIES; i++) {
+        if (i == NB_RETRIES - 1) {
+          fail("Waited too much time for del replication");
+        }
+        Result res = lHtable2.get(get);
+        if (res.size() >= 1) {
+          LOG.info("Row not deleted");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          break;
+        }
+      }
+
+      // Checking verifyReplication for the default behavior.
+      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
+      runVerifyReplication(argsWithoutRaw, 0, 0);
+
+      // Checking verifyReplication with raw
+      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
+      runVerifyReplication(argsWithRawAsTrue, 1, 0);
+    } finally {
+      if (lHtable1 != null) {
+        lHtable1.close();
+      }
+      if (lHtable2 != null) {
+        lHtable2.close();
+      }
+    }
+  }
+
+  // VerifyReplication should honor versions option
+  @Test
+  public void testHBase14905() throws Exception {
+    // normal Batch tests
+    byte[] qualifierName = Bytes.toBytes("f1");
+    Put put = new Put(Bytes.toBytes("r1"));
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1002"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1001"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
+    htable1.put(put);
+
+    Scan scan = new Scan();
+    scan.readVersions(100);
+    ResultScanner scanner1 = htable1.getScanner(scan);
+    Result[] res1 = scanner1.next(1);
+    scanner1.close();
+
+    assertEquals(1, res1.length);
+    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      scan = new Scan();
+      scan.readVersions(100);
+      scanner1 = htable2.getScanner(scan);
+      res1 = scanner1.next(1);
+      scanner1.close();
+      if (res1.length != 1) {
+        LOG.info("Only got " + res1.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
+        if (cellNumber != 3) {
+          LOG.info("Only got " + cellNumber + " cells");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          break;
+        }
+      }
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for normal batch replication");
+      }
+    }
+
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1111"));
+    htable2.put(put);
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
+    htable2.put(put);
+
+    scan = new Scan();
+    scan.readVersions(100);
+    scanner1 = htable2.getScanner(scan);
+    res1 = scanner1.next(NB_ROWS_IN_BATCH);
+    scanner1.close();
+
+    assertEquals(1, res1.length);
+    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
+
+    String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
+    runVerifyReplication(args, 0, 1);
+  }
+
+  // VerifyReplication should honor versions option
+  @Test
+  public void testVersionMismatchHBase14905() throws Exception {
+    // normal Batch tests
+    byte[] qualifierName = Bytes.toBytes("f1");
+    Put put = new Put(Bytes.toBytes("r1"));
+    long ts = System.currentTimeMillis();
+    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
+    htable1.put(put);
+
+    Scan scan = new Scan();
+    scan.readVersions(100);
+    ResultScanner scanner1 = htable1.getScanner(scan);
+    Result[] res1 = scanner1.next(1);
+    scanner1.close();
+
+    assertEquals(1, res1.length);
+    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      scan = new Scan();
+      scan.readVersions(100);
+      scanner1 = htable2.getScanner(scan);
+      res1 = scanner1.next(1);
+      scanner1.close();
+      if (res1.length != 1) {
+        LOG.info("Only got " + res1.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
+        if (cellNumber != 3) {
+          LOG.info("Only got " + cellNumber + " cells");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          break;
+        }
+      }
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for normal batch replication");
+      }
+    }
+
+    try {
+      // Disabling replication and modifying the particular version of the cell to validate the
+      // feature.
+      hbaseAdmin.disableReplicationPeer(PEER_ID);
+      Put put2 = new Put(Bytes.toBytes("r1"));
+      put2.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v99"));
+      htable2.put(put2);
+
+      scan = new Scan();
+      scan.readVersions(100);
+      scanner1 = htable2.getScanner(scan);
+      res1 = scanner1.next(NB_ROWS_IN_BATCH);
+      scanner1.close();
+      assertEquals(1, res1.length);
+      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+      String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() };
+      runVerifyReplication(args, 0, 1);
+    } finally {
+      hbaseAdmin.enableReplicationPeer(PEER_ID);
+    }
+  }
+
+  @Test
+  public void testVerifyReplicationPrefixFiltering() throws Exception {
+    final byte[] prefixRow = Bytes.toBytes("prefixrow");
+    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
+    loadData("prefixrow", prefixRow);
+    loadData("secondrow", prefixRow2);
+    loadData("aaa", row);
+    loadData("zzz", row);
+    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
+    String[] args =
+        new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() };
+    runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0);
+  }
+
+  @Test
+  public void testVerifyReplicationSnapshotArguments() {
+    String[] args =
+        new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
+        tableName.getNameAsString() };
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
+        "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
+        tableName.getNameAsString() };
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
+        "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
+        "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
+
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+  }
+
+  private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
+    assertNotNull(subDirectories);
+    assertEquals(subDirectories.length, expectedCount);
+    for (int i = 0; i < expectedCount; i++) {
+      assertTrue(subDirectories[i].isDirectory());
+    }
+  }
+
+  @Test
+  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
+    // Populate the tables, at the same time it guarantees that the tables are
+    // identical since it does the check
+    runSmallBatchTest();
+
+    // Take source and target tables snapshot
+    Path rootDir = FSUtils.getRootDir(conf1);
+    FileSystem fs = rootDir.getFileSystem(conf1);
+    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+      new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+    // Take target snapshot
+    Path peerRootDir = FSUtils.getRootDir(conf2);
+    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    String peerFSAddress = peerFs.getUri().toString();
+    String temPath1 = utility1.getRandomDir().toString();
+    String temPath2 = "/tmp2";
+
+    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+    Job job = new VerifyReplication().createSubmittableJob(conf1, args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(NB_ROWS_IN_BATCH,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(0,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+    checkRestoreTmpDir(conf1, temPath1, 1);
+    checkRestoreTmpDir(conf2, temPath2, 1);
+
+    Scan scan = new Scan();
+    ResultScanner rs = htable2.getScanner(scan);
+    Put put = null;
+    for (Result result : rs) {
+      put = new Put(result.getRow());
+      Cell firstVal = result.rawCells()[0];
+      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+        Bytes.toBytes("diff data"));
+      htable2.put(put);
+    }
+    Delete delete = new Delete(put.getRow());
+    htable2.delete(delete);
+
+    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+      new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+    job = new VerifyReplication().createSubmittableJob(conf1, args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(0,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(NB_ROWS_IN_BATCH,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+    checkRestoreTmpDir(conf1, temPath1, 2);
+    checkRestoreTmpDir(conf2, temPath2, 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/48bb1901/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 0592e2d..afb975d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -18,24 +18,38 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -95,9 +109,72 @@ public class TestReplicationBase {
     return Arrays.asList(false, true);
   }
 
-  /**
-   * @throws java.lang.Exception
-   */
+  protected final void cleanUp() throws IOException, InterruptedException {
+    // Starting and stopping replication can make us miss new logs,
+    // rolling like this makes sure the most recent one gets added to the queue
+    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
+        .getRegionServerThreads()) {
+      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+    }
+    int rowCount = utility1.countRows(tableName);
+    utility1.deleteTableData(tableName);
+    // truncating the table will send one Delete per row to the slave cluster
+    // in an async fashion, which is why we cannot just call deleteTableData on
+    // utility2 since late writes could make it to the slave in some way.
+    // Instead, we truncate the first table and wait for all the Deletes to
+    // make it to the slave.
+    Scan scan = new Scan();
+    int lastCount = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for truncate");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(rowCount);
+      scanner.close();
+      if (res.length != 0) {
+        if (res.length < lastCount) {
+          i--; // Don't increment timeout if we make progress
+        }
+        lastCount = res.length;
+        LOG.info("Still got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  protected static void waitForReplication(int expectedRows, int retries)
+      throws IOException, InterruptedException {
+    Scan scan;
+    for (int i = 0; i < retries; i++) {
+      scan = new Scan();
+      if (i== retries -1) {
+        fail("Waited too much time for normal batch replication");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(expectedRows);
+      scanner.close();
+      if (res.length != expectedRows) {
+        LOG.info("Only got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  protected static void loadData(String prefix, byte[] row) throws IOException {
+    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+      put.addColumn(famName, row, row);
+      puts.add(put);
+    }
+    htable1.put(puts);
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
@@ -149,20 +226,17 @@ public class TestReplicationBase {
     // as a component in deciding maximum number of parallel batches to send to the peer cluster.
     utility2.startMiniCluster(4);
 
-    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-    rpc.setClusterKey(utility2.getClusterKey());
+    ReplicationPeerConfig rpc =
+        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
     hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
     hbaseAdmin.addReplicationPeer("2", rpc);
 
-    HTableDescriptor table = new HTableDescriptor(tableName);
-    HColumnDescriptor fam = new HColumnDescriptor(famName);
-    fam.setMaxVersions(100);
-    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
-    table.addFamily(fam);
-    fam = new HColumnDescriptor(noRepfamName);
-    table.addFamily(fam);
+    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
     scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for(HColumnDescriptor f : table.getColumnFamilies()) {
+    for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
       scopes.put(f.getName(), f.getScope());
     }
     Connection connection1 = ConnectionFactory.createConnection(conf1);
@@ -179,9 +253,60 @@ public class TestReplicationBase {
     htable2 = connection2.getTable(tableName);
   }
 
-  /**
-   * @throws java.lang.Exception
-   */
+  protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
+    Put put = new Put(row);
+    put.addColumn(famName, row, row);
+
+    htable1 = utility1.getConnection().getTable(tableName);
+    htable1.put(put);
+
+    Get get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.isEmpty()) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+
+    Delete del = new Delete(row);
+    htable1.delete(del);
+
+    get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for del replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        LOG.info("Row not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  protected static void runSmallBatchTest() throws IOException, InterruptedException {
+    // normal Batch tests
+    loadData("", row);
+
+    Scan scan = new Scan();
+
+    ResultScanner scanner1 = htable1.getScanner(scan);
+    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
+    scanner1.close();
+    assertEquals(NB_ROWS_IN_BATCH, res1.length);
+
+    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     htable2.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/48bb1901/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
new file mode 100644
index 0000000..4a43aa7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    cleanUp();
+  }
+
+  /**
+   * Waits until there is only one log(the current writing one) in the replication queue
+   * @param numRs number of regionservers
+   */
+  private void waitForLogAdvance(int numRs) throws Exception {
+    Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        for (int i = 0; i < numRs; i++) {
+          HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
+          RegionInfo regionInfo =
+              utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+          WAL wal = hrs.getWAL(regionInfo);
+          Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
+          Replication replicationService = (Replication) utility1.getHBaseCluster()
+              .getRegionServer(i).getReplicationSourceService();
+          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+              .getSources()) {
+            ReplicationSource source = (ReplicationSource) rsi;
+            if (!currentFile.equals(source.getCurrentPath())) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    });
+  }
+
+  @Test
+  public void testEmptyWALRecovery() throws Exception {
+    final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+    // for each RS, create an empty wal with same walGroupId
+    final List<Path> emptyWalPaths = new ArrayList<>();
+    long ts = System.currentTimeMillis();
+    for (int i = 0; i < numRs; i++) {
+      RegionInfo regionInfo =
+          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+      Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+      utility1.getTestFileSystem().create(emptyWalPath).close();
+      emptyWalPaths.add(emptyWalPath);
+    }
+
+    // inject our empty wal into the replication queue, and then roll the original wal, which
+    // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
+    // determine if the file being replicated currently is still opened for write, so just inject a
+    // new wal to the replication queue does not mean the previous file is closed.
+    for (int i = 0; i < numRs; i++) {
+      HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
+      Replication replicationService = (Replication) hrs.getReplicationSourceService();
+      replicationService.preLogRoll(null, emptyWalPaths.get(i));
+      replicationService.postLogRoll(null, emptyWalPaths.get(i));
+      RegionInfo regionInfo =
+          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = hrs.getWAL(regionInfo);
+      wal.rollWriter(true);
+    }
+
+    // ReplicationSource should advance past the empty wal, or else the test will fail
+    waitForLogAdvance(numRs);
+
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    runSimplePutDeleteTest();
+  }
+}


Mime
View raw message