hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject hbase git commit: HBASE-15847 VerifyReplication prefix filtering
Date Thu, 26 May 2016 22:57:00 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 ffa899f2d -> 734c50bc6


HBASE-15847 VerifyReplication prefix filtering

Signed-off-by: Andrew Purtell <apurtell@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/734c50bc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/734c50bc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/734c50bc

Branch: refs/heads/0.98
Commit: 734c50bc616cc3595f45dd212a5ff0200a5c9ea5
Parents: ffa899f
Author: Geoffrey <gjacoby@salesforce.com>
Authored: Thu May 19 16:03:47 2016 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Thu May 26 15:51:35 2016 -0700

----------------------------------------------------------------------
 .../replication/VerifyReplication.java          |  60 +++++++++-
 .../replication/TestReplicationSmallTests.java  | 117 ++++++++++---------
 2 files changed, 119 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/734c50bc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 0176457..44bfdfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.mapreduce.replication;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +36,9 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -77,6 +81,7 @@ public class VerifyReplication extends Configured implements Tool {
   static String tableName = null;
   static String families = null;
   static String peerId = null;
+  static String rowPrefixes = null;
 
   /**
    * Map-only comparator for 2 tables
@@ -117,6 +122,8 @@ public class VerifyReplication extends Configured implements Tool {
             scan.addFamily(Bytes.toBytes(fam));
           }
         }
+        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
+        setRowPrefixFilter(scan, rowPrefixes);
         scan.setTimeRange(startTime, endTime);
         int versions = conf.getInt(NAME+".versions", -1);
         LOG.info("Setting number of version inside map as: " + versions);
@@ -254,6 +261,9 @@ public class VerifyReplication extends Configured implements Tool {
     if (families != null) {
       conf.set(NAME+".families", families);
     }
+    if (rowPrefixes != null){
+      conf.set(NAME+".rowPrefixes", rowPrefixes);
+    }
 
     Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
     ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
@@ -282,6 +292,9 @@ public class VerifyReplication extends Configured implements Tool {
         scan.addFamily(Bytes.toBytes(fam));
       }
     }
+
+    setRowPrefixFilter(scan, rowPrefixes);
+
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Verifier.class, null, null, job);
 
@@ -294,11 +307,38 @@ public class VerifyReplication extends Configured implements Tool {
     return job;
   }
 
+  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
+    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
+      String[] rowPrefixArray = rowPrefixes.split(",");
+      Arrays.sort(rowPrefixArray);
+      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+      for (String prefix : rowPrefixArray) {
+        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
+        filterList.addFilter(filter);
+      }
+      scan.setFilter(filterList);
+      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
+      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
+      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
+    }
+  }
+
+  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow)
{
+    scan.setStartRow(startPrefixRow);
+    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
+        new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
+    scan.setStopRow(stopRow);
+  }
+
   private static boolean doCommandLine(final String[] args) {
     if (args.length < 2) {
       printUsage(null);
       return false;
     }
+    //in case we've been run before, restore all parameters to their initial states
+    //Otherwise, if our previous run included a parameter not in args this time,
+    //we might hold on to the old value.
+    restoreDefaults();
     try {
       for (int i = 0; i < args.length; i++) {
         String cmd = args[i];
@@ -337,6 +377,12 @@ public class VerifyReplication extends Configured implements Tool {
           continue;
         }
 
+        final String rowPrefixesKey = "--row-prefixes=";
+        if (cmd.startsWith(rowPrefixesKey)){
+          rowPrefixes = cmd.substring(rowPrefixesKey.length());
+          continue;
+        }
+
         if (i == args.length-2) {
           peerId = cmd;
         }
@@ -353,6 +399,17 @@ public class VerifyReplication extends Configured implements Tool {
     return true;
   }
 
+  private static void restoreDefaults() {
+    startTime = 0;
+    endTime = Long.MAX_VALUE;
+    batch = Integer.MAX_VALUE;
+    versions = -1;
+    tableName = null;
+    families = null;
+    peerId = null;
+    rowPrefixes = null;
+  }
+
   /*
    * @param errorMsg Error message.  Can be null.
    */
@@ -361,7 +418,7 @@ public class VerifyReplication extends Configured implements Tool {
       System.err.println("ERROR: " + errorMsg);
     }
     System.err.println("Usage: verifyrep [--starttime=X]" +
-        " [--stoptime=Y] [--families=A] <peerid> <tablename>");
+        " [--stoptime=Y] [--families=A] [--row-prefixes=B] <peerid> <tablename>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" starttime    beginning of the time range");
@@ -369,6 +426,7 @@ public class VerifyReplication extends Configured implements Tool {
     System.err.println(" endtime      end of the time range");
     System.err.println(" versions     number of cell versions to verify");
     System.err.println(" families     comma-separated list of families to copy");
+    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter
on ");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" peerid       Id of the peer used for verification, must match the
one given for replication");

http://git-wip-us.apache.org/repos/asf/hbase/blob/734c50bc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 2bb1302..c6f5c41 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -23,11 +23,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.ClusterStatus;
@@ -61,13 +64,12 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.ByteString;
-import com.sun.tools.javac.code.Attribute.Array;
 
 @Category(LargeTests.class)
 public class TestReplicationSmallTests extends TestReplicationBase {
 
   private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
+  private static final String PEER_ID = "2";
 
   /**
    * @throws java.lang.Exception
@@ -81,6 +83,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
         utility1.getHBaseCluster().getRegionServerThreads()) {
       r.getRegionServer().getWAL().rollWriter();
     }
+    int rowCount = utility1.countRows(htable1);
     utility1.truncateTable(tableName);
     // truncating the table will send one Delete per row to the slave cluster
     // in an async fashion, which is why we cannot just call truncateTable on
@@ -94,7 +97,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
         fail("Waited too much time for truncate");
       }
       ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+      Result[] res = scanner.next(rowCount);
       scanner.close();
       if (res.length != 0) {
         if (res.length < lastCount) {
@@ -253,11 +256,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     Put put;
     // normal Batch tests
     htable1.setAutoFlush(false, true);
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      put = new Put(Bytes.toBytes(i));
-      put.add(famName, row, row);
-      htable1.put(put);
-    }
+    loadData("", row);
     htable1.flushCommits();
 
     Scan scan = new Scan();
@@ -267,15 +266,20 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     scanner1.close();
     assertEquals(NB_ROWS_IN_BATCH, res1.length);
 
-    for (int i = 0; i < NB_RETRIES; i++) {
+    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
+  }
+
+  private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException
{
+    Scan scan;
+    for (int i = 0; i < retries; i++) {
       scan = new Scan();
-      if (i==NB_RETRIES-1) {
+      if (i== retries -1) {
         fail("Waited too much time for normal batch replication");
       }
       ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+      Result[] res = scanner.next(expectedRows);
       scanner.close();
-      if (res.length != NB_ROWS_IN_BATCH) {
+      if (res.length != expectedRows) {
         LOG.info("Only got " + res.length + " rows");
         Thread.sleep(SLEEP_TIME);
       } else {
@@ -284,6 +288,16 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
   }
 
+  private void loadData(String prefix, byte[] row) throws IOException {
+    List<Put> puts = new ArrayList<Put>();
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+      put.add(famName, row, row);
+      puts.add(put);
+    }
+    htable1.put(puts);
+  }
+
   /**
    * Test disable/enable replication, trying to insert, make sure nothing's
    * replicated, enable it, the insert should be replicated
@@ -294,7 +308,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
   public void testDisableEnable() throws Exception {
 
     // Test disabling replication
-    admin.disablePeer("2");
+    admin.disablePeer(PEER_ID);
 
     byte[] rowkey = Bytes.toBytes("disable enable");
     Put put = new Put(rowkey);
@@ -313,7 +327,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
 
     // Test enable replication
-    admin.enablePeer("2");
+    admin.enablePeer(PEER_ID);
 
     for (int i = 0; i < NB_RETRIES; i++) {
       Result res = htable2.get(get);
@@ -337,7 +351,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
   @Test(timeout=300000)
   public void testAddAndRemoveClusters() throws Exception {
     LOG.info("testAddAndRemoveClusters");
-    admin.removePeer("2");
+    admin.removePeer(PEER_ID);
     Thread.sleep(SLEEP_TIME);
     byte[] rowKey = Bytes.toBytes("Won't be replicated");
     Put put = new Put(rowKey);
@@ -454,18 +468,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     // identical since it does the check
     testSmallBatch();
 
-    String[] args = new String[] {"2", Bytes.toString(tableName)};
-    Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(0, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    String[] args = new String[] {PEER_ID, Bytes.toString(tableName)};
+    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
 
     Scan scan = new Scan();
     ResultScanner rs = htable2.getScanner(scan);
@@ -479,16 +483,21 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     }
     Delete delete = new Delete(put.getRow());
     htable2.delete(delete);
-    job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
+    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+  }
+
+  private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS),
args);
     if (job == null) {
       fail("Job wasn't created, see the log");
     }
     if (!job.waitForCompletion(true)) {
       fail("Job failed, see the log");
     }
-    assertEquals(0, job.getCounters().
+    assertEquals(expectedGoodRows, job.getCounters().
         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
+    assertEquals(expectedBadRows, job.getCounters().
         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
   }
 
@@ -551,18 +560,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     assertEquals(1, res1.length);
     assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
 
-    String[] args = new String[] {"--versions=100", "2", Bytes.toString(tableName)};
-    Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
-    if (job == null) {
-      fail("Job wasn't created, see the log");
-    }
-    if (!job.waitForCompletion(true)) {
-      fail("Job failed, see the log");
-    }
-    assertEquals(0, job.getCounters().
-      findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-    assertEquals(1, job.getCounters().
-      findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+    String[] args = new String[] {"--versions=100", PEER_ID, Bytes.toString(tableName)};
+    runVerifyReplication(args, 0, 1);
   }
 
   @Test(timeout=300000)
@@ -613,7 +612,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
    
     try {
       // Disabling replication and modifying the particular version of the cell to validate
the feature.  
-      admin.disablePeer("2");
+      admin.disablePeer(PEER_ID);
       Put put2 = new Put(Bytes.toBytes("r1"));
       put2.add(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
       htable2.put(put2);
@@ -625,22 +624,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
       scanner1.close();
       assertEquals(1, res1.length);
       assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
-    
-      String[] args = new String[] {"--versions=100", "2", Bytes.toString(tableName)};
-      Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
-      if (job == null) {
-        fail("Job wasn't created, see the log");
-      }
-      if (!job.waitForCompletion(true)) {
-        fail("Job failed, see the log");
-      }    
-      assertEquals(0, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
-      assertEquals(1, job.getCounters().
-        findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+      String[] args = new String[] {"--versions=100", PEER_ID, Bytes.toString(tableName)};
+      runVerifyReplication(args, 0, 1);
       }
     finally {
-      admin.enablePeer("2");
+      admin.enablePeer(PEER_ID);
     }
   }
 
@@ -757,4 +746,18 @@ public class TestReplicationSmallTests extends TestReplicationBase {
       admin.close();
     }
   }
+  
+  @Test(timeout=300000)
+  public void testVerifyReplicationPrefixFiltering() throws Exception {
+    final byte[] prefixRow = Bytes.toBytes("prefixrow");
+    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
+    loadData("prefixrow", prefixRow);
+    loadData("secondrow", prefixRow2);
+    loadData("aaa", row);
+    loadData("zzz", row);
+    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
+    String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID,
+        Bytes.toString(tableName)};
+    runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
+  }
 }


Mime
View raw message