hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1551465 - in /hbase/branches/0.98/hbase-server/src: main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/replication/
Date Tue, 17 Dec 2013 05:47:01 GMT
Author: larsh
Date: Tue Dec 17 05:47:01 2013
New Revision: 1551465

URL: http://svn.apache.org/r1551465
Log:
HBASE-9047 Tool to handle finishing replication when the cluster is offline (Demai Ni)

Added:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1551465&r1=1551464&r2=1551465&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
(original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Tue Dec 17 05:47:01 2013
@@ -498,6 +498,26 @@ public class ReplicationSource extends T
               }
             }
           }
+          // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush
data
+          // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+          if (stopper instanceof ReplicationSyncUp.DummyServer) {
+            FileStatus[] rss = fs.listStatus(manager.getLogDir());
+            for (FileStatus rs : rss) {
+              Path p = rs.getPath();
+              FileStatus[] logs = fs.listStatus(p);
+              for (FileStatus log : logs) {
+                p = new Path(p, log.getPath().getName());
+                if (p.getName().equals(currentPath.getName())) {
+                  currentPath = p;
+                  LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
+                  // Open the log at the new location
+                  this.openReader(sleepMultiplier);
+                  return true;
+                }
+              }
+            }
+          }
+
           // TODO What happens if the log was missing from every single location?
           // Although we need to check a couple of times as the log could have
           // been moved by the master between the checks

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1551465&r1=1551464&r2=1551465&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
(original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Tue Dec 17 05:47:01 2013
@@ -285,6 +285,14 @@ public class ReplicationSourceManager im
     return this.sources;
   }
 
+  /**
+   * Get a list of all the old sources of this rs
+   * @return list of all old sources
+   */
+  public List<ReplicationSourceInterface> getOldSources() {
+    return this.oldsources;
+  }
+
   void preLogRoll(Path newLog) throws IOException {
 
     synchronized (this.hlogsById) {

Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java?rev=1551465&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
(added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
Tue Dec 17 05:47:01 2013
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * In a scenario of Replication based Disaster/Recovery, when hbase
+ * Master-Cluster crashes, this tool is used to sync-up the delta from Master to
+ * Slave using the info from Zookeeper. The tool will run on Master-Cluser, and
+ * assume ZK, Filesystem and NetWork still available after hbase crashes
+ *
+ * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
+ */
+
+public class ReplicationSyncUp extends Configured implements Tool {
+
+  static final Log LOG = LogFactory.getLog(ReplicationSyncUp.class.getName());
+
+  private static Configuration conf;
+
+  private static final long SLEEP_TIME = 10000;
+
+  // although the tool is designed to be run on command line
+  // this api is provided for executing the tool through another app
+  public static void setConfigure(Configuration config) {
+    conf = config;
+  }
+
+  /**
+   * Main program
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    if (conf == null) conf = HBaseConfiguration.create();
+    int ret = ToolRunner.run(conf, new ReplicationSyncUp(), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Replication replication;
+    ReplicationSourceManager manager;
+    FileSystem fs;
+    Path oldLogDir, logDir, rootDir;
+    ZooKeeperWatcher zkw;
+
+    Abortable abortable = new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+    };
+
+    zkw =
+        new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable,
+            true);
+
+    rootDir = FSUtils.getRootDir(conf);
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+
+    System.out.println("Start Replication Server start");
+    replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+    manager.init();
+
+    try {
+      int numberOfOldSource = 1; // default wait once
+      while (numberOfOldSource > 0) {
+        Thread.sleep(SLEEP_TIME);
+        numberOfOldSource = manager.getOldSources().size();
+      }
+    } catch (InterruptedException e) {
+      System.err.println("didn't wait long enough:" + e);
+      return (-1);
+    }
+
+    manager.join();
+
+    return (0);
+  }
+
+  static class DummyServer implements Server {
+    String hostname;
+    ZooKeeperWatcher zkw;
+
+    DummyServer(ZooKeeperWatcher zkw) {
+      // an unique name in case the first run fails
+      hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org";
+      this.zkw = zkw;
+    }
+
+    DummyServer(String hostname) {
+      this.hostname = hostname;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return zkw;
+    }
+
+    @Override
+    public CatalogTracker getCatalogTracker() {
+      return null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return ServerName.valueOf(hostname, 1234, 1L);
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+
+    @Override
+    public void stop(String why) {
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+  }
+}

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java?rev=1551465&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
(added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
Tue Dec 17 05:47:01 2013
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestReplicationSyncUpTool extends TestReplicationBase {
+
+  private static final Log LOG = LogFactory.getLog(TestReplicationSyncUpTool.class);
+
+  private static final byte[] t1_su = Bytes.toBytes("t1_syncup");
+  private static final byte[] t2_su = Bytes.toBytes("t2_syncup");
+
+  private static final byte[] famName = Bytes.toBytes("cf1");
+  private static final byte[] qualName = Bytes.toBytes("q1");
+
+  private static final byte[] noRepfamName = Bytes.toBytes("norep");
+
+  private HTableDescriptor t1_syncupSource, t1_syncupTarget;
+  private HTableDescriptor t2_syncupSource, t2_syncupTarget;
+
+  private HTable ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
+  private int rowCount_ht1Source, rowCount_ht2Source, rowCount_ht1TargetAtPeer1,
+      rowCount_ht2TargetAtPeer1;
+
+  @Before
+  public void setUp() throws Exception {
+
+    HColumnDescriptor fam;
+
+    t1_syncupSource = new HTableDescriptor(TableName.valueOf(t1_su));
+    fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    t1_syncupSource.addFamily(fam);
+    fam = new HColumnDescriptor(noRepfamName);
+    t1_syncupSource.addFamily(fam);
+
+    t1_syncupTarget = new HTableDescriptor(TableName.valueOf(t1_su));
+    fam = new HColumnDescriptor(famName);
+    t1_syncupTarget.addFamily(fam);
+    fam = new HColumnDescriptor(noRepfamName);
+    t1_syncupTarget.addFamily(fam);
+
+    t2_syncupSource = new HTableDescriptor(TableName.valueOf(t2_su));
+    fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    t2_syncupSource.addFamily(fam);
+    fam = new HColumnDescriptor(noRepfamName);
+    t2_syncupSource.addFamily(fam);
+
+    t2_syncupTarget = new HTableDescriptor(TableName.valueOf(t2_su));
+    fam = new HColumnDescriptor(famName);
+    t2_syncupTarget.addFamily(fam);
+    fam = new HColumnDescriptor(noRepfamName);
+    t2_syncupTarget.addFamily(fam);
+
+  }
+
+  /**
+   * Add a row to a table in each cluster, check it's replicated, delete it,
+   * check's gone Also check the puts and deletes are not replicated back to
+   * the originating cluster.
+   */
+  @Test(timeout = 300000)
+  public void testSyncUpTool() throws Exception {
+
+    /**
+     * Set up Replication: on Master and one Slave
+     * Table: t1_syncup and t2_syncup
+     * columnfamily:
+     *    'cf1'  : replicated
+     *    'norep': not replicated
+     */
+    setupReplication();
+
+    /**
+     * at Master:
+     * t1_syncup: put 100 rows into cf1, and 1 rows into norep
+     * t2_syncup: put 200 rows into cf1, and 1 rows into norep
+     *
+     * verify correctly replicated to slave
+     */
+    putAndReplicateRows();
+
+    /**
+     * Verify delete works
+     *
+     * step 1: stop hbase on Slave
+     *
+     * step 2: at Master:
+     *  t1_syncup: delete 50 rows  from cf1
+     *  t2_syncup: delete 100 rows from cf1
+     *  no change on 'norep'
+     *
+     * step 3: stop hbase on master, restart hbase on Slave
+     *
+     * step 4: verify Slave still have the rows before delete
+     *      t1_syncup: 100 rows from cf1
+     *      t2_syncup: 200 rows from cf1
+     *
+     * step 5: run syncup tool on Master
+     *
+     * step 6: verify that delete show up on Slave
+     *      t1_syncup: 50 rows from cf1
+     *      t2_syncup: 100 rows from cf1
+     *
+     * verify correctly replicated to Slave
+     */
+    mimicSyncUpAfterDelete();
+
+    /**
+     * Verify put works
+     *
+     * step 1: stop hbase on Slave
+     *
+     * step 2: at Master:
+     *  t1_syncup: put 100 rows  from cf1
+     *  t2_syncup: put 200 rows  from cf1
+     *  and put another row on 'norep'
+     *  ATTN: put to 'cf1' will overwrite existing rows, so end count will
+     *        be 100 and 200 respectively
+     *      put to 'norep' will add a new row.
+     *
+     * step 3: stop hbase on master, restart hbase on Slave
+     *
+     * step 4: verify Slave still has the rows before put
+     *      t1_syncup: 50 rows from cf1
+     *      t2_syncup: 100 rows from cf1
+     *
+     * step 5: run syncup tool on Master
+     *
+     * step 6: verify that put show up on Slave
+     *         and 'norep' does not
+     *      t1_syncup: 100 rows from cf1
+     *      t2_syncup: 200 rows from cf1
+     *
+     * verify correctly replicated to Slave
+     */
+    mimicSyncUpAfterPut();
+
+  }
+
+  private void setupReplication() throws Exception {
+    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
+    ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
+
+    HBaseAdmin ha = new HBaseAdmin(conf1);
+    ha.createTable(t1_syncupSource);
+    ha.createTable(t2_syncupSource);
+    ha.close();
+
+    ha = new HBaseAdmin(conf2);
+    ha.createTable(t1_syncupTarget);
+    ha.createTable(t2_syncupTarget);
+    ha.close();
+
+    // Get HTable from Master
+    ht1Source = new HTable(conf1, t1_su);
+    ht1Source.setWriteBufferSize(1024);
+    ht2Source = new HTable(conf1, t2_su);
+    ht1Source.setWriteBufferSize(1024);
+
+    // Get HTable from Peer1
+    ht1TargetAtPeer1 = new HTable(conf2, t1_su);
+    ht1TargetAtPeer1.setWriteBufferSize(1024);
+    ht2TargetAtPeer1 = new HTable(conf2, t2_su);
+    ht2TargetAtPeer1.setWriteBufferSize(1024);
+
+    /**
+     * set M-S : Master: utility1 Slave1: utility2
+     */
+    admin1.addPeer("1", utility2.getClusterKey());
+
+    admin1.close();
+    admin2.close();
+  }
+
+  private void putAndReplicateRows() throws Exception {
+    // add rows to Master cluster,
+    Put p;
+
+    // 100 + 1 row to t1_syncup
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      p = new Put(Bytes.toBytes("row" + i));
+      p.add(famName, qualName, Bytes.toBytes("val" + i));
+      ht1Source.put(p);
+    }
+    p = new Put(Bytes.toBytes("row" + 9999));
+    p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
+    ht1Source.put(p);
+
+    // 200 + 1 row to t2_syncup
+    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
+      p = new Put(Bytes.toBytes("row" + i));
+      p.add(famName, qualName, Bytes.toBytes("val" + i));
+      ht2Source.put(p);
+    }
+    p = new Put(Bytes.toBytes("row" + 9999));
+    p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
+    ht2Source.put(p);
+
+    // ensure replication completed
+    Thread.sleep(SLEEP_TIME);
+
+    rowCount_ht1Source = utility1.countRows(ht1Source);
+    rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+    assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source
- 1,
+      rowCount_ht1TargetAtPeer1);
+
+    rowCount_ht2Source = utility1.countRows(ht2Source);
+    rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+    assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source
- 1,
+      rowCount_ht2TargetAtPeer1);
+
+  }
+
+  private void mimicSyncUpAfterDelete() throws Exception {
+    utility2.shutdownMiniHBaseCluster();
+
+    List<Delete> list = new ArrayList<Delete>();
+    // delete half of the rows
+    for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
+      String rowKey = "row" + i;
+      Delete del = new Delete(rowKey.getBytes());
+      list.add(del);
+    }
+    ht1Source.delete(list);
+
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      String rowKey = "row" + i;
+      Delete del = new Delete(rowKey.getBytes());
+      list.add(del);
+    }
+    ht2Source.delete(list);
+
+    rowCount_ht1Source = utility1.countRows(ht1Source);
+    assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam",
51,
+      rowCount_ht1Source);
+
+    rowCount_ht2Source = utility1.countRows(ht2Source);
+    assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
+      101, rowCount_ht2Source);
+
+    utility1.shutdownMiniHBaseCluster();
+    utility2.restartHBaseCluster(1);
+
+    Thread.sleep(SLEEP_TIME);
+
+    // before sync up
+    rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+    rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+
+    // After sync up
+    syncUp(utility1);
+    rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+    rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
+      rowCount_ht1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
+      rowCount_ht2TargetAtPeer1);
+
+  }
+
+  private void mimicSyncUpAfterPut() throws Exception {
+    utility1.restartHBaseCluster(1);
+    utility2.shutdownMiniHBaseCluster();
+
+    Put p;
+    // another 100 + 1 row to t1_syncup
+    // we should see 100 + 2 rows now
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      p = new Put(Bytes.toBytes("row" + i));
+      p.add(famName, qualName, Bytes.toBytes("val" + i));
+      ht1Source.put(p);
+    }
+    p = new Put(Bytes.toBytes("row" + 9998));
+    p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
+    ht1Source.put(p);
+
+    // another 200 + 1 row to t1_syncup
+    // we should see 200 + 2 rows now
+    for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
+      p = new Put(Bytes.toBytes("row" + i));
+      p.add(famName, qualName, Bytes.toBytes("val" + i));
+      ht2Source.put(p);
+    }
+    p = new Put(Bytes.toBytes("row" + 9998));
+    p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
+    ht2Source.put(p);
+
+    rowCount_ht1Source = utility1.countRows(ht1Source);
+    assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
+    rowCount_ht2Source = utility1.countRows(ht2Source);
+    assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
+
+    utility1.shutdownMiniHBaseCluster();
+    utility2.restartHBaseCluster(1);
+
+    Thread.sleep(SLEEP_TIME);
+
+    // before sync up
+    rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+    rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
+      rowCount_ht1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
+      rowCount_ht2TargetAtPeer1);
+
+    // after syun up
+    syncUp(utility1);
+    rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+    rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
+      rowCount_ht1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
+      rowCount_ht2TargetAtPeer1);
+
+  }
+
+  private void syncUp(HBaseTestingUtility ut) throws Exception {
+    ReplicationSyncUp.setConfigure(ut.getConfiguration());
+    String[] arguments = new String[] { null };
+    new ReplicationSyncUp().run(arguments);
+  }
+
+}



Mime
View raw message