Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C860C200D60 for ; Fri, 1 Dec 2017 16:18:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C6E9C160C1F; Fri, 1 Dec 2017 15:18:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7B9BF160C1D for ; Fri, 1 Dec 2017 16:18:23 +0100 (CET) Received: (qmail 37408 invoked by uid 500); 1 Dec 2017 15:18:19 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 37087 invoked by uid 99); 1 Dec 2017 15:18:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Dec 2017 15:18:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 90F59F60D7; Fri, 1 Dec 2017 15:18:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 01 Dec 2017 15:18:21 -0000 Message-Id: <50bf780a7d674d64ba05569ef1606c0d@git.apache.org> In-Reply-To: <6f7837ae31b8413bb854db2056b74cbc@git.apache.org> References: <6f7837ae31b8413bb854db2056b74cbc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/50] hbase-site git commit: Published site at . archived-at: Fri, 01 Dec 2017 15:18:26 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a5c4dca3/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestReplicaWithCluster.RegionServerStoppedCopro.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestReplicaWithCluster.RegionServerStoppedCopro.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestReplicaWithCluster.RegionServerStoppedCopro.html index 33ce961..6e909b1 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestReplicaWithCluster.RegionServerStoppedCopro.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestReplicaWithCluster.RegionServerStoppedCopro.html @@ -55,755 +55,752 @@ 047 048import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 049import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; -050import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; -051import org.apache.hadoop.hbase.coprocessor.ObserverContext; -052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -054import org.apache.hadoop.hbase.coprocessor.RegionObserver; -055import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -056import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -057import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; -058import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; -059import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -060import org.apache.hadoop.hbase.testclassification.ClientTests; -061import org.apache.hadoop.hbase.testclassification.MediumTests; -062import org.apache.hadoop.hbase.util.Bytes; -063import org.apache.hadoop.hbase.util.Pair; -064import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -065import org.junit.AfterClass; -066import org.junit.Assert; -067import org.junit.BeforeClass; -068import org.junit.Test; -069import org.junit.experimental.categories.Category; -070 -071@Category({MediumTests.class, ClientTests.class}) -072public class TestReplicaWithCluster { -073 private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class); -074 -075 private static final int NB_SERVERS = 3; -076 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes(); -077 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); -078 -079 // second minicluster used in testing of replication -080 private static HBaseTestingUtility HTU2; -081 private static final byte[] f = HConstants.CATALOG_FAMILY; -082 -083 private final static int REFRESH_PERIOD = 1000; -084 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; -085 -086 /** -087 * This copro is used to synchronize the tests. -088 */ -089 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { -090 static final AtomicLong sleepTime = new AtomicLong(0); -091 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); -092 -093 public SlowMeCopro() { -094 } -095 -096 @Override -097 public Optional<RegionObserver> getRegionObserver() { -098 return Optional.of(this); -099 } -100 -101 @Override -102 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, -103 final Get get, final List<Cell> results) throws IOException { -104 -105 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { -106 CountDownLatch latch = cdl.get(); -107 try { -108 if (sleepTime.get() > 0) { -109 LOG.info("Sleeping for " + sleepTime.get() + " ms"); -110 Thread.sleep(sleepTime.get()); -111 } else if (latch.getCount() > 0) { -112 LOG.info("Waiting for the counterCountDownLatch"); -113 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. -114 if (latch.getCount() > 0) { -115 throw new RuntimeException("Can't wait more"); -116 } -117 } -118 } catch (InterruptedException e1) { -119 LOG.error(e1); -120 } -121 } else { -122 LOG.info("We're not the primary replicas."); -123 } -124 } -125 } -126 -127 /** -128 * This copro is used to simulate region server down exception for Get and Scan -129 */ -130 @CoreCoprocessor -131 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { -132 -133 public RegionServerStoppedCopro() { -134 } -135 -136 @Override -137 public Optional<RegionObserver> getRegionObserver() { -138 return Optional.of(this); -139 } -140 -141 @Override -142 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, -143 final Get get, final List<Cell> results) throws IOException { -144 -145 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); -146 -147 // Fail for the primary replica and replica 1 -148 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { -149 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); -150 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() -151 + " not running"); -152 } else { -153 LOG.info("We're replica region " + replicaId); -154 } -155 } -156 -157 @Override -158 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, -159 final Scan scan) throws IOException { -160 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); -161 // Fail for the primary replica and replica 1 -162 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { -163 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); -164 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() -165 + " not running"); -166 } else { -167 LOG.info("We're replica region " + replicaId); -168 } -169 } -170 } -171 -172 /** -173 * This copro is used to slow down the primary meta region scan a bit -174 */ -175 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro -176 implements RegionCoprocessor, RegionObserver { -177 static boolean slowDownPrimaryMetaScan = false; -178 static boolean throwException = false; -179 -180 @Override -181 public Optional<RegionObserver> getRegionObserver() { -182 return Optional.of(this); -183 } -184 -185 @Override -186 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, -187 final Get get, final List<Cell> results) throws IOException { -188 -189 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); -190 -191 // Fail for the primary replica, but not for meta -192 if (throwException) { -193 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { -194 LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment() -195 .getRegion().getRegionInfo()); -196 throw new RegionServerStoppedException("Server " + -197 ((HasRegionServerServices)e.getEnvironment()).getRegionServerServices().getServerName() -198 + " not running"); -199 } -200 } else { -201 LOG.info("Get, We're replica region " + replicaId); -202 } -203 } -204 -205 @Override -206 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, -207 final Scan scan) throws IOException { +050import org.apache.hadoop.hbase.coprocessor.ObserverContext; +051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +053import org.apache.hadoop.hbase.coprocessor.RegionObserver; +054import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +055import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +056import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; +057import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +059import org.apache.hadoop.hbase.testclassification.ClientTests; +060import org.apache.hadoop.hbase.testclassification.MediumTests; +061import org.apache.hadoop.hbase.util.Bytes; +062import org.apache.hadoop.hbase.util.Pair; +063import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +064import org.junit.AfterClass; +065import org.junit.Assert; +066import org.junit.BeforeClass; +067import org.junit.Test; +068import org.junit.experimental.categories.Category; +069 +070@Category({MediumTests.class, ClientTests.class}) +071public class TestReplicaWithCluster { +072 private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class); +073 +074 private static final int NB_SERVERS = 3; +075 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes(); +076 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); +077 +078 // second minicluster used in testing of replication +079 private static HBaseTestingUtility HTU2; +080 private static final byte[] f = HConstants.CATALOG_FAMILY; +081 +082 private final static int REFRESH_PERIOD = 1000; +083 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200; +084 +085 /** +086 * This copro is used to synchronize the tests. +087 */ +088 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { +089 static final AtomicLong sleepTime = new AtomicLong(0); +090 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0)); +091 +092 public SlowMeCopro() { +093 } +094 +095 @Override +096 public Optional<RegionObserver> getRegionObserver() { +097 return Optional.of(this); +098 } +099 +100 @Override +101 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, +102 final Get get, final List<Cell> results) throws IOException { +103 +104 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { +105 CountDownLatch latch = cdl.get(); +106 try { +107 if (sleepTime.get() > 0) { +108 LOG.info("Sleeping for " + sleepTime.get() + " ms"); +109 Thread.sleep(sleepTime.get()); +110 } else if (latch.getCount() > 0) { +111 LOG.info("Waiting for the counterCountDownLatch"); +112 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. +113 if (latch.getCount() > 0) { +114 throw new RuntimeException("Can't wait more"); +115 } +116 } +117 } catch (InterruptedException e1) { +118 LOG.error(e1); +119 } +120 } else { +121 LOG.info("We're not the primary replicas."); +122 } +123 } +124 } +125 +126 /** +127 * This copro is used to simulate region server down exception for Get and Scan +128 */ +129 @CoreCoprocessor +130 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver { +131 +132 public RegionServerStoppedCopro() { +133 } +134 +135 @Override +136 public Optional<RegionObserver> getRegionObserver() { +137 return Optional.of(this); +138 } +139 +140 @Override +141 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, +142 final Get get, final List<Cell> results) throws IOException { +143 +144 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); +145 +146 // Fail for the primary replica and replica 1 +147 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { +148 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); +149 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() +150 + " not running"); +151 } else { +152 LOG.info("We're replica region " + replicaId); +153 } +154 } +155 +156 @Override +157 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, +158 final Scan scan) throws IOException { +159 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); +160 // Fail for the primary replica and replica 1 +161 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) { +162 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId); +163 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() +164 + " not running"); +165 } else { +166 LOG.info("We're replica region " + replicaId); +167 } +168 } +169 } +170 +171 /** +172 * This copro is used to slow down the primary meta region scan a bit +173 */ +174 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro +175 implements RegionCoprocessor, RegionObserver { +176 static boolean slowDownPrimaryMetaScan = false; +177 static boolean throwException = false; +178 +179 @Override +180 public Optional<RegionObserver> getRegionObserver() { +181 return Optional.of(this); +182 } +183 +184 @Override +185 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, +186 final Get get, final List<Cell> results) throws IOException { +187 +188 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); +189 +190 // Fail for the primary replica, but not for meta +191 if (throwException) { +192 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { +193 LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment() +194 .getRegion().getRegionInfo()); +195 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() +196 + " not running"); +197 } +198 } else { +199 LOG.info("Get, We're replica region " + replicaId); +200 } +201 } +202 +203 @Override +204 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, +205 final Scan scan) throws IOException { +206 +207 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); 208 -209 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); -210 -211 // Slow down with the primary meta region scan -212 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { -213 if (slowDownPrimaryMetaScan) { -214 LOG.info("Scan with primary meta region, slow down a bit"); -215 try { -216 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); -217 } catch (InterruptedException ie) { -218 // Ingore -219 } -220 } -221 -222 // Fail for the primary replica -223 if (throwException) { -224 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment() -225 .getRegion().getRegionInfo()); -226 -227 throw new RegionServerStoppedException("Server " + -228 ((HasRegionServerServices)e.getEnvironment()).getRegionServerServices().getServerName() -229 + " not running"); -230 } else { -231 LOG.info("Scan, We're replica region " + replicaId); -232 } -233 } else { -234 LOG.info("Scan, We're replica region " + replicaId); -235 } -236 } -237 } -238 -239 @BeforeClass -240 public static void beforeClass() throws Exception { -241 // enable store file refreshing -242 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, -243 REFRESH_PERIOD); -244 -245 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); -246 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); -247 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); -248 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); -249 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); -250 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); -251 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); -252 -253 // Wait for primary call longer so make sure that it will get exception from the primary call -254 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); -255 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); +209 // Slow down with the primary meta region scan +210 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { +211 if (slowDownPrimaryMetaScan) { +212 LOG.info("Scan with primary meta region, slow down a bit"); +213 try { +214 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); +215 } catch (InterruptedException ie) { +216 // Ingore +217 } +218 } +219 +220 // Fail for the primary replica +221 if (throwException) { +222 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment() +223 .getRegion().getRegionInfo()); +224 +225 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName() +226 + " not running"); +227 } else { +228 LOG.info("Scan, We're replica region " + replicaId); +229 } +230 } else { +231 LOG.info("Scan, We're replica region " + replicaId); +232 } +233 } +234 } +235 +236 @BeforeClass +237 public static void beforeClass() throws Exception { +238 // enable store file refreshing +239 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, +240 REFRESH_PERIOD); +241 +242 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f); +243 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240); +244 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100); +245 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2); +246 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10); +247 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1); +248 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10); +249 +250 // Wait for primary call longer so make sure that it will get exception from the primary call +251 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000); +252 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000); +253 +254 // Retry less so it can fail faster +255 HTU.getConfiguration().setInt("hbase.client.retries.number", 1); 256 -257 // Retry less so it can fail faster -258 HTU.getConfiguration().setInt("hbase.client.retries.number", 1); +257 // Enable meta replica at server side +258 HTU.getConfiguration().setInt("hbase.meta.replica.count", 2); 259 -260 // Enable meta replica at server side -261 HTU.getConfiguration().setInt("hbase.meta.replica.count", 2); +260 // Make sure master does not host system tables. +261 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); 262 -263 // Make sure master does not host system tables. -264 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none"); -265 -266 // Set system coprocessor so it can be applied to meta regions -267 HTU.getConfiguration().set("hbase.coprocessor.region.classes", -268 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); +263 // Set system coprocessor so it can be applied to meta regions +264 HTU.getConfiguration().set("hbase.coprocessor.region.classes", +265 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); +266 +267 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, +268 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); 269 -270 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, -271 META_SCAN_TIMEOUT_IN_MILLISEC * 1000); -272 -273 HTU.startMiniCluster(NB_SERVERS); -274 HTU.getHBaseCluster().startMaster(); -275 } -276 -277 @AfterClass -278 public static void afterClass() throws Exception { -279 if (HTU2 != null) -280 HTU2.shutdownMiniCluster(); -281 HTU.shutdownMiniCluster(); -282 } -283 -284 @Test (timeout=30000) -285 public void testCreateDeleteTable() throws IOException { -286 // Create table then get the single region for our new table. -287 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable"); -288 hdt.setRegionReplication(NB_SERVERS); -289 hdt.addCoprocessor(SlowMeCopro.class.getName()); -290 Table table = HTU.createTable(hdt, new byte[][]{f}, null); -291 -292 Put p = new Put(row); -293 p.addColumn(f, row, row); -294 table.put(p); -295 -296 Get g = new Get(row); -297 Result r = table.get(g); -298 Assert.assertFalse(r.isStale()); -299 -300 try { -301 // But if we ask for stale we will get it -302 SlowMeCopro.cdl.set(new CountDownLatch(1)); -303 g = new Get(row); -304 g.setConsistency(Consistency.TIMELINE); -305 r = table.get(g); -306 Assert.assertTrue(r.isStale()); -307 SlowMeCopro.cdl.get().countDown(); -308 } finally { -309 SlowMeCopro.cdl.get().countDown(); -310 SlowMeCopro.sleepTime.set(0); -311 } -312 -313 HTU.getAdmin().disableTable(hdt.getTableName()); -314 HTU.deleteTable(hdt.getTableName()); -315 } -316 -317 @Test (timeout=120000) -318 public void testChangeTable() throws Exception { -319 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable")) -320 .setRegionReplication(NB_SERVERS) -321 .addCoprocessor(SlowMeCopro.class.getName()) -322 .addColumnFamily(ColumnFamilyDescriptorBuilder.of(f)) -323 .build(); -324 HTU.getAdmin().createTable(td); -325 Table table = HTU.getConnection().getTable(td.getTableName()); -326 // basic test: it should work. -327 Put p = new Put(row); -328 p.addColumn(f, row, row); -329 table.put(p); -330 -331 Get g = new Get(row); -332 Result r = table.get(g); -333 Assert.assertFalse(r.isStale()); -334 -335 // Add a CF, it should work. -336 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); -337 td = TableDescriptorBuilder.newBuilder(td) -338 .addColumnFamily(ColumnFamilyDescriptorBuilder.of(row)) -339 .build(); -340 HTU.getAdmin().disableTable(td.getTableName()); -341 HTU.getAdmin().modifyTable(td); -342 HTU.getAdmin().enableTable(td.getTableName()); -343 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); -344 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), -345 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); -346 -347 p = new Put(row); -348 p.addColumn(row, row, row); -349 table.put(p); -350 -351 g = new Get(row); -352 r = table.get(g); -353 Assert.assertFalse(r.isStale()); -354 -355 try { -356 SlowMeCopro.cdl.set(new CountDownLatch(1)); -357 g = new Get(row); -358 g.setConsistency(Consistency.TIMELINE); -359 r = table.get(g); -360 Assert.assertTrue(r.isStale()); -361 } finally { -362 SlowMeCopro.cdl.get().countDown(); -363 SlowMeCopro.sleepTime.set(0); -364 } -365 -366 Admin admin = HTU.getAdmin(); -367 nHdt =admin.getDescriptor(td.getTableName()); -368 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), -369 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); -370 -371 admin.disableTable(td.getTableName()); -372 admin.deleteTable(td.getTableName()); -373 admin.close(); -374 } -375 -376 @SuppressWarnings("deprecation") -377 @Test (timeout=300000) -378 public void testReplicaAndReplication() throws Exception { -379 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication"); -380 hdt.setRegionReplication(NB_SERVERS); -381 -382 HColumnDescriptor fam = new HColumnDescriptor(row); -383 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); -384 hdt.addFamily(fam); +270 HTU.startMiniCluster(NB_SERVERS); +271 HTU.getHBaseCluster().startMaster(); +272 } +273 +274 @AfterClass +275 public static void afterClass() throws Exception { +276 if (HTU2 != null) +277 HTU2.shutdownMiniCluster(); +278 HTU.shutdownMiniCluster(); +279 } +280 +281 @Test (timeout=30000) +282 public void testCreateDeleteTable() throws IOException { +283 // Create table then get the single region for our new table. +284 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable"); +285 hdt.setRegionReplication(NB_SERVERS); +286 hdt.addCoprocessor(SlowMeCopro.class.getName()); +287 Table table = HTU.createTable(hdt, new byte[][]{f}, null); +288 +289 Put p = new Put(row); +290 p.addColumn(f, row, row); +291 table.put(p); +292 +293 Get g = new Get(row); +294 Result r = table.get(g); +295 Assert.assertFalse(r.isStale()); +296 +297 try { +298 // But if we ask for stale we will get it +299 SlowMeCopro.cdl.set(new CountDownLatch(1)); +300 g = new Get(row); +301 g.setConsistency(Consistency.TIMELINE); +302 r = table.get(g); +303 Assert.assertTrue(r.isStale()); +304 SlowMeCopro.cdl.get().countDown(); +305 } finally { +306 SlowMeCopro.cdl.get().countDown(); +307 SlowMeCopro.sleepTime.set(0); +308 } +309 +310 HTU.getAdmin().disableTable(hdt.getTableName()); +311 HTU.deleteTable(hdt.getTableName()); +312 } +313 +314 @Test (timeout=120000) +315 public void testChangeTable() throws Exception { +316 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable")) +317 .setRegionReplication(NB_SERVERS) +318 .addCoprocessor(SlowMeCopro.class.getName()) +319 .addColumnFamily(ColumnFamilyDescriptorBuilder.of(f)) +320 .build(); +321 HTU.getAdmin().createTable(td); +322 Table table = HTU.getConnection().getTable(td.getTableName()); +323 // basic test: it should work. +324 Put p = new Put(row); +325 p.addColumn(f, row, row); +326 table.put(p); +327 +328 Get g = new Get(row); +329 Result r = table.get(g); +330 Assert.assertFalse(r.isStale()); +331 +332 // Add a CF, it should work. +333 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName()); +334 td = TableDescriptorBuilder.newBuilder(td) +335 .addColumnFamily(ColumnFamilyDescriptorBuilder.of(row)) +336 .build(); +337 HTU.getAdmin().disableTable(td.getTableName()); +338 HTU.getAdmin().modifyTable(td); +339 HTU.getAdmin().enableTable(td.getTableName()); +340 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName()); +341 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), +342 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); +343 +344 p = new Put(row); +345 p.addColumn(row, row, row); +346 table.put(p); +347 +348 g = new Get(row); +349 r = table.get(g); +350 Assert.assertFalse(r.isStale()); +351 +352 try { +353 SlowMeCopro.cdl.set(new CountDownLatch(1)); +354 g = new Get(row); +355 g.setConsistency(Consistency.TIMELINE); +356 r = table.get(g); +357 Assert.assertTrue(r.isStale()); +358 } finally { +359 SlowMeCopro.cdl.get().countDown(); +360 SlowMeCopro.sleepTime.set(0); +361 } +362 +363 Admin admin = HTU.getAdmin(); +364 nHdt =admin.getDescriptor(td.getTableName()); +365 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()), +366 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount()); +367 +368 admin.disableTable(td.getTableName()); +369 admin.deleteTable(td.getTableName()); +370 admin.close(); +371 } +372 +373 @SuppressWarnings("deprecation") +374 @Test (timeout=300000) +375 public void testReplicaAndReplication() throws Exception { +376 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication"); +377 hdt.setRegionReplication(NB_SERVERS); +378 +379 HColumnDescriptor fam = new HColumnDescriptor(row); +380 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); +381 hdt.addFamily(fam); +382 +383 hdt.addCoprocessor(SlowMeCopro.class.getName()); +384 HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 385 -386 hdt.addCoprocessor(SlowMeCopro.class.getName()); -387 HTU.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); -388 -389 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); -390 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); -391 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); -392 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); -393 -394 HTU2 = new HBaseTestingUtility(conf2); -395 HTU2.setZkCluster(miniZK); -396 HTU2.startMiniCluster(NB_SERVERS); -397 LOG.info("Setup second Zk"); -398 HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); -399 -400 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); -401 -402 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); -403 rpc.setClusterKey(HTU2.getClusterKey()); -404 admin.addPeer("2", rpc, null); -405 admin.close(); -406 -407 Put p = new Put(row); -408 p.addColumn(row, row, row); -409 final Table table = HTU.getConnection().getTable(hdt.getTableName()); -410 table.put(p); +386 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration()); +387 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); +388 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); +389 MiniZooKeeperCluster miniZK = HTU.getZkCluster(); +390 +391 HTU2 = new HBaseTestingUtility(conf2); +392 HTU2.setZkCluster(miniZK); +393 HTU2.startMiniCluster(NB_SERVERS); +394 LOG.info("Setup second Zk"); +395 HTU2.getAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); +396 +397 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); +398 +399 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); +400 rpc.setClusterKey(HTU2.getClusterKey()); +401 admin.addPeer("2", rpc, null); +402 admin.close(); +403 +404 Put p = new Put(row); +405 p.addColumn(row, row, row); +406 final Table table = HTU.getConnection().getTable(hdt.getTableName()); +407 table.put(p); +408 +409 HTU.getAdmin().flush(table.getName()); +410 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); 411 -412 HTU.getAdmin().flush(table.getName()); -413 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster."); -414 -415 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { -416 @Override public boolean evaluate() throws Exception { -417 try { -418 SlowMeCopro.cdl.set(new CountDownLatch(1)); -419 Get g = new Get(row); -420 g.setConsistency(Consistency.TIMELINE); -421 Result r = table.get(g); -422 Assert.assertTrue(r.isStale()); -423 return !r.isEmpty(); -424 } finally { -425 SlowMeCopro.cdl.get().countDown(); -426 SlowMeCopro.sleepTime.set(0); -427 } -428 } -429 }); -430 table.close(); -431 LOG.info("stale get on the first cluster done. Now for the second."); -432 -433 final Table table2 = HTU.getConnection().getTable(hdt.getTableName()); -434 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { -435 @Override public boolean evaluate() throws Exception { -436 try { -437 SlowMeCopro.cdl.set(new CountDownLatch(1)); -438 Get g = new Get(row); -439 g.setConsistency(Consistency.TIMELINE); -440 Result r = table2.get(g); -441 Assert.assertTrue(r.isStale()); -442 return !r.isEmpty(); -443 } finally { -444 SlowMeCopro.cdl.get().countDown(); -445 SlowMeCopro.sleepTime.set(0); -446 } -447 } -448 }); -449 table2.close(); +412 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { +413 @Override public boolean evaluate() throws Exception { +414 try { +415 SlowMeCopro.cdl.set(new CountDownLatch(1)); +416 Get g = new Get(row); +417 g.setConsistency(Consistency.TIMELINE); +418 Result r = table.get(g); +419 Assert.assertTrue(r.isStale()); +420 return !r.isEmpty(); +421 } finally { +422 SlowMeCopro.cdl.get().countDown(); +423 SlowMeCopro.sleepTime.set(0); +424 } +425 } +426 }); +427 table.close(); +428 LOG.info("stale get on the first cluster done. Now for the second."); +429 +430 final Table table2 = HTU.getConnection().getTable(hdt.getTableName()); +431 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() { +432 @Override public boolean evaluate() throws Exception { +433 try { +434 SlowMeCopro.cdl.set(new CountDownLatch(1)); +435 Get g = new Get(row); +436 g.setConsistency(Consistency.TIMELINE); +437 Result r = table2.get(g); +438 Assert.assertTrue(r.isStale()); +439 return !r.isEmpty(); +440 } finally { +441 SlowMeCopro.cdl.get().countDown(); +442 SlowMeCopro.sleepTime.set(0); +443 } +444 } +445 }); +446 table2.close(); +447 +448 HTU.getAdmin().disableTable(hdt.getTableName()); +449 HTU.deleteTable(hdt.getTableName()); 450 -451 HTU.getAdmin().disableTable(hdt.getTableName()); -452 HTU.deleteTable(hdt.getTableName()); +451 HTU2.getAdmin().disableTable(hdt.getTableName()); +452 HTU2.deleteTable(hdt.getTableName()); 453 -454 HTU2.getAdmin().disableTable(hdt.getTableName()); -455 HTU2.deleteTable(hdt.getTableName()); -456 -457 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down -458 // the minicluster has negative impact of deleting all HConnections in JVM. -459 } -460 -461 @Test (timeout=30000) -462 public void testBulkLoad() throws IOException { -463 // Create table then get the single region for our new table. -464 LOG.debug("Creating test table"); -465 HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad"); -466 hdt.setRegionReplication(NB_SERVERS); -467 hdt.addCoprocessor(SlowMeCopro.class.getName()); -468 Table table = HTU.createTable(hdt, new byte[][]{f}, null); -469 -470 // create hfiles to load. -471 LOG.debug("Creating test data"); -472 Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad"); -473 final int numRows = 10; -474 final byte[] qual = Bytes.toBytes("qual"); -475 final byte[] val = Bytes.toBytes("val"); -476 final List<Pair<byte[], String>> famPaths = new ArrayList<>(); -477 for (HColumnDescriptor col : hdt.getColumnFamilies()) { -478 Path hfile = new Path(dir, col.getNameAsString()); -479 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), -480 qual, val, numRows); -481 famPaths.add(new Pair<>(col.getName(), hfile.toString())); -482 } -483 -484 // bulk load HFiles -485 LOG.debug("Loading test data"); -486 final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); -487 table = conn.getTable(hdt.getTableName()); -488 final String bulkToken = -489 new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); -490 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, -491 hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0), -492 new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { -493 @Override -494 protected Void rpcCall() throws Exception { -495 LOG.debug("Going to connect to server " + getLocation() + " for row " -496 + Bytes.toStringBinary(getRow())); -497 SecureBulkLoadClient secureClient = null; -498 byte[] regionName = getLocation().getRegionInfo().getRegionName(); -499 try (Table table = conn.getTable(getTableName())) { -500 secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); -501 secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, -502 true, null, bulkToken); -503 } -504 return null; -505 } -506 }; -507 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); -508 RpcRetryingCaller<Void> caller = factory.newCaller(); -509 caller.callWithRetries(callable, 10000); -510 -511 // verify we can read them from the primary -512 LOG.debug("Verifying data load"); -513 for (int i = 0; i < numRows; i++) { -514 byte[] row = TestHRegionServerBulkLoad.rowkey(i); -515 Get g = new Get(row); -516 Result r = table.get(g); -517 Assert.assertFalse(r.isStale()); -518 } -519 -520 // verify we can read them from the replica -521 LOG.debug("Verifying replica queries"); -522 try { -523 SlowMeCopro.cdl.set(new CountDownLatch(1)); -524 for (int i = 0; i < numRows; i++) { -525 byte[] row = TestHRegionServerBulkLoad.rowkey(i); -526 Get g = new Get(row); -527 g.setConsistency(Consistency.TIMELINE); -528 Result r = table.get(g); -529 Assert.assertTrue(r.isStale()); -530 } -531 SlowMeCopro.cdl.get().countDown(); -532 } finally { -533 SlowMeCopro.cdl.get().countDown(); -534 SlowMeCopro.sleepTime.set(0); -535 } -536 -537 HTU.getAdmin().disableTable(hdt.getTableName()); -538 HTU.deleteTable(hdt.getTableName()); -539 } -540