hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-15847 VerifyReplication prefix filtering (Geoffrey Jacoby)
Date Thu, 19 May 2016 13:38:34 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 460b41c80 -> a050e1d9f


HBASE-15847 VerifyReplication prefix filtering (Geoffrey Jacoby)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a050e1d9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a050e1d9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a050e1d9

Branch: refs/heads/master
Commit: a050e1d9f84eed0b73b5f8ca30d9d93b5bf50057
Parents: 460b41c
Author: tedyu <yuzhihong@gmail.com>
Authored: Thu May 19 06:38:27 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Thu May 19 06:38:27 2016 -0700

----------------------------------------------------------------------
 .../replication/VerifyReplication.java          |  60 +++++++++-
 .../replication/TestReplicationSmallTests.java  | 115 ++++++++++---------
 2 files changed, 117 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a050e1d9/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index a452036..655c71a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.mapreduce.replication;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +36,9 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -77,6 +81,7 @@ public class VerifyReplication extends Configured implements Tool {
   static String tableName = null;
   static String families = null;
   static String peerId = null;
+  static String rowPrefixes = null;
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
@@ -123,6 +128,8 @@ public class VerifyReplication extends Configured implements Tool {
             scan.addFamily(Bytes.toBytes(fam));
           }
         }
+        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
+        setRowPrefixFilter(scan, rowPrefixes);
         scan.setTimeRange(startTime, endTime);
         int versions = conf.getInt(NAME+".versions", -1);
         LOG.info("Setting number of version inside map as: " + versions);
@@ -271,6 +278,9 @@ public class VerifyReplication extends Configured implements Tool {
     if (families != null) {
       conf.set(NAME+".families", families);
     }
+    if (rowPrefixes != null){
+      conf.set(NAME+".rowPrefixes", rowPrefixes);
+    }
 
     Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
     ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
@@ -299,6 +309,9 @@ public class VerifyReplication extends Configured implements Tool {
         scan.addFamily(Bytes.toBytes(fam));
       }
     }
+
+    setRowPrefixFilter(scan, rowPrefixes);
+
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Verifier.class, null, null, job);
 
@@ -311,11 +324,38 @@ public class VerifyReplication extends Configured implements Tool {
     return job;
   }
 
+  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
+    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
+      String[] rowPrefixArray = rowPrefixes.split(",");
+      Arrays.sort(rowPrefixArray);
+      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+      for (String prefix : rowPrefixArray) {
+        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
+        filterList.addFilter(filter);
+      }
+      scan.setFilter(filterList);
+      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
+      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
+      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
+    }
+  }
+
+  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow)
{
+    scan.setStartRow(startPrefixRow);
+    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
+        new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
+    scan.setStopRow(stopRow);
+  }
+
   private static boolean doCommandLine(final String[] args) {
     if (args.length < 2) {
       printUsage(null);
       return false;
     }
+    //in case we've been run before, restore all parameters to their initial states
+    //Otherwise, if our previous run included a parameter not in args this time,
+    //we might hold on to the old value.
+    restoreDefaults();
     try {
       for (int i = 0; i < args.length; i++) {
         String cmd = args[i];
@@ -354,6 +394,12 @@ public class VerifyReplication extends Configured implements Tool {
           continue;
         }
 
+        final String rowPrefixesKey = "--row-prefixes=";
+        if (cmd.startsWith(rowPrefixesKey)){
+          rowPrefixes = cmd.substring(rowPrefixesKey.length());
+          continue;
+        }
+
         if (i == args.length-2) {
           peerId = cmd;
         }
@@ -370,6 +416,17 @@ public class VerifyReplication extends Configured implements Tool {
     return true;
   }
 
+  private static void restoreDefaults() {
+    startTime = 0;
+    endTime = Long.MAX_VALUE;
+    batch = Integer.MAX_VALUE;
+    versions = -1;
+    tableName = null;
+    families = null;
+    peerId = null;
+    rowPrefixes = null;
+  }
+
   /*
    * @param errorMsg Error message.  Can be null.
    */
@@ -378,7 +435,7 @@ public class VerifyReplication extends Configured implements Tool {
       System.err.println("ERROR: " + errorMsg);
     }
     System.err.println("Usage: verifyrep [--starttime=X]" +
-        " [--stoptime=Y] [--families=A] <peerid> <tablename>");
+        " [--stoptime=Y] [--families=A] [--row-prefixes=B] <peerid> <tablename>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" starttime    beginning of the time range");
@@ -386,6 +443,7 @@ public class VerifyReplication extends Configured implements Tool {
     System.err.println(" endtime      end of the time range");
     System.err.println(" versions     number of cell versions to verify");
     System.err.println(" families     comma-separated list of families to copy");
+    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter
on ");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" peerid       Id of the peer used for verification, must match the
one given for replication");

http://git-wip-us.apache.org/repos/asf/hbase/blob/a050e1d9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 2a20a4f..8efa67e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +32,7 @@ import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.ClusterStatus;
@@ -72,6 +74,7 @@ import org.junit.experimental.categories.Category;
 public class TestReplicationSmallTests extends TestReplicationBase {
 
   private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
+  private static final String PEER_ID = "2";
 
   /**
    * @throws java.lang.Exception
@@ -84,6 +87,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
         utility1.getHBaseCluster().getRegionServerThreads()) {
       utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
     }
+    int rowCount = utility1.countRows(tableName);
     utility1.deleteTableData(tableName);
     // truncating the table will send one Delete per row to the slave cluster
     // in an async fashion, which is why we cannot just call deleteTableData on
@@ -97,7 +101,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
         fail("Waited too much time for truncate");
       }
       ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+      Result[] res = scanner.next(rowCount);
       scanner.close();
       if (res.length != 0) {
         if (res.length < lastCount) {
@@ -254,13 +258,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
   public void testSmallBatch() throws Exception {
     LOG.info("testSmallBatch");
     // normal Batch tests
-    List<Put> puts = new ArrayList<>();
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put put = new Put(Bytes.toBytes(i));
-      put.addColumn(famName, row, row);
-      puts.add(put);
-    }
-    htable1.put(puts);
+    loadData("", row);
 
     Scan scan = new Scan();
 
@@ -269,15 +267,20 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     scanner1.close();
     assertEquals(NB_ROWS_IN_BATCH, res1.length);
 
-    for (int i = 0; i < NB_RETRIES; i++) {
+    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
+  }
+
+  private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException
{
+    Scan scan;
+    for (int i = 0; i < retries; i++) {
       scan = new Scan();
-      if (i==NB_RETRIES-1) {
+      if (i== retries -1) {
         fail("Waited too much time for normal batch replication");
       }
       ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+      Result[] res = scanner.next(expectedRows);
       scanner.close();
-      if (res.length != NB_ROWS_IN_BATCH) {
+      if (res.length != expectedRows) {
         LOG.info("Only got " + res.length + " rows");
         Thread.sleep(SLEEP_TIME);
       } else {
@@ -286,6 +289,16 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
   }
 
+  private void loadData(String prefix, byte[] row) throws IOException {
+    List<Put> puts = new ArrayList<>();
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+      put.addColumn(famName, row, row);
+      puts.add(put);
+    }
+    htable1.put(puts);
+  }
+
   /**
    * Test disable/enable replication, trying to insert, make sure nothing's
    * replicated, enable it, the insert should be replicated
@@ -296,7 +309,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
   public void testDisableEnable() throws Exception {
 
     // Test disabling replication
-    admin.disablePeer("2");
+    admin.disablePeer(PEER_ID);
 
     byte[] rowkey = Bytes.toBytes("disable enable");
     Put put = new Put(rowkey);
@@ -315,7 +328,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
 
     // Test enable replication
-    admin.enablePeer("2");
+    admin.enablePeer(PEER_ID);
 
     for (int i = 0; i < NB_RETRIES; i++) {
       Result res = htable2.get(get);
@@ -339,7 +352,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
   @Test(timeout=300000)
   public void testAddAndRemoveClusters() throws Exception {
     LOG.info("testAddAndRemoveClusters");
-    admin.removePeer("2");
+    admin.removePeer(PEER_ID);
     Thread.sleep(SLEEP_TIME);
     byte[] rowKey = Bytes.toBytes("Won't be replicated");
     Put put = new Put(rowKey);
@@ -361,7 +374,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(utility2.getClusterKey());
-    admin.addPeer("2", rpc, null);
+    admin.addPeer(PEER_ID, rpc, null);
     Thread.sleep(SLEEP_TIME);
     rowKey = Bytes.toBytes("do rep");
     put = new Put(rowKey);
@@ -459,18 +472,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     // identical since it does the check
     testSmallBatch();
 
-    String[] args = new String[] {"2", tableName.getNameAsString()};
-    Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(0, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    String[] args = new String[] {PEER_ID, tableName.getNameAsString()};
+    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
 
     Scan scan = new Scan();
     ResultScanner rs = htable2.getScanner(scan);
@@ -484,16 +487,21 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
     Delete delete = new Delete(put.getRow());
     htable2.delete(delete);
-    job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
+    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+  }
+
+  private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS),
args);
     if (job == null) {
       fail("Job wasn't created, see the log");
     }
     if (!job.waitForCompletion(true)) {
       fail("Job failed, see the log");
     }
-    assertEquals(0, job.getCounters().
+    assertEquals(expectedGoodRows, job.getCounters().
         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
+    assertEquals(expectedBadRows, job.getCounters().
         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
   }
 
@@ -556,18 +564,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     assertEquals(1, res1.length);
     assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
 
-    String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
-    Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(0, job.getCounters().
-      findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(1, job.getCounters().
-      findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
+    runVerifyReplication(args, 0, 1);
   }
 
   @Test(timeout=300000)
@@ -618,7 +616,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
    
     try {
       // Disabling replication and modifying the particular version of the cell to validate
the feature.  
-      admin.disablePeer("2");
+      admin.disablePeer(PEER_ID);
       Put put2 = new Put(Bytes.toBytes("r1"));
       put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
       htable2.put(put2);
@@ -631,21 +629,11 @@ public class TestReplicationSmallTests extends TestReplicationBase {
       assertEquals(1, res1.length);
       assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
     
-      String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
-      Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
-      if (job == null) {
-        fail("Job wasn't created, see the log");
-      }
-      if (!job.waitForCompletion(true)) {
-        fail("Job failed, see the log");
-      }    
-      assertEquals(0, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-      assertEquals(1, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+      String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
+      runVerifyReplication(args, 0, 1);
       }
     finally {
-      admin.enablePeer("2");
+      admin.enablePeer(PEER_ID);
     }
   }
 
@@ -803,5 +791,18 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
   }
 
+  @Test(timeout=300000)
+  public void testVerifyReplicationPrefixFiltering() throws Exception {
+    final byte[] prefixRow = Bytes.toBytes("prefixrow");
+    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
+    loadData("prefixrow", prefixRow);
+    loadData("secondrow", prefixRow2);
+    loadData("aaa", row);
+    loadData("zzz", row);
+    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
+    String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID,
+        tableName.getNameAsString()};
+    runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
+  }
 
 }


Mime
View raw message