Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 350BD17A74 for ; Tue, 31 Mar 2015 01:40:22 +0000 (UTC) Received: (qmail 20295 invoked by uid 500); 31 Mar 2015 01:40:21 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 20177 invoked by uid 500); 31 Mar 2015 01:40:21 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 19612 invoked by uid 99); 31 Mar 2015 01:40:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Mar 2015 01:40:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 750B3E2EE7; Tue, 31 Mar 2015 01:40:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: apurtell@apache.org To: commits@hbase.apache.org Date: Tue, 31 Mar 2015 01:40:25 -0000 Message-Id: <1d5e5a82f7ab4696898b3f3faaea5398@git.apache.org> In-Reply-To: <8aef9f7247ab4f26bb76663eae8e2f38@git.apache.org> References: <8aef9f7247ab4f26bb76663eae8e2f38@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/16] hbase git commit: HBASE-12972 Region, a supportable public/evolving subset of HRegion http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 34c417d..a618a32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -253,8 +253,7 @@ public class HRegionServer extends HasThread implements * Map of regions currently being served by this region server. Key is the * encoded region name. All access should be synchronized. */ - protected final Map onlineRegions = - new ConcurrentHashMap(); + protected final Map onlineRegions = new ConcurrentHashMap(); /** * Map of encoded region names to the DataNode locations they should be hosted on @@ -272,8 +271,8 @@ public class HRegionServer extends HasThread implements * Set of regions currently being in recovering state which means it can accept writes(edits from * previous failed region server) but not reads. A recovering region is also an online region. */ - protected final Map recoveringRegions = Collections - .synchronizedMap(new HashMap()); + protected final Map recoveringRegions = Collections + .synchronizedMap(new HashMap()); // Leases protected Leases leases; @@ -1073,7 +1072,7 @@ public class HRegionServer extends HasThread implements private boolean areAllUserRegionsOffline() { if (getNumberOfOnlineRegions() > 2) return false; boolean allUserRegionsOffline = true; - for (Map.Entry e: this.onlineRegions.entrySet()) { + for (Map.Entry e: this.onlineRegions.entrySet()) { if (!e.getValue().getRegionInfo().isMetaTable()) { allUserRegionsOffline = false; break; @@ -1087,7 +1086,7 @@ public class HRegionServer extends HasThread implements */ private long getWriteRequestCount() { int writeCount = 0; - for (Map.Entry e: this.onlineRegions.entrySet()) { + for (Map.Entry e: this.onlineRegions.entrySet()) { writeCount += e.getValue().getWriteRequestsCount(); } return writeCount; @@ -1133,10 +1132,9 @@ public class HRegionServer extends HasThread implements // Instead they should be stored in an HBase table so that external visibility into HBase is // improved; Additionally the load balancer will be able to take advantage of a more complete // history. - MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper(); - Collection regions = getOnlineRegionsLocalContext(); - MemoryUsage memory = - ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); + Collection regions = getOnlineRegionsLocalContext(); + MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); @@ -1151,7 +1149,7 @@ public class HRegionServer extends HasThread implements } RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); - for (HRegion region : regions) { + for (Region region : regions) { serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) { @@ -1186,7 +1184,7 @@ public class HRegionServer extends HasThread implements String getOnlineRegionsAsPrintableString() { StringBuilder sb = new StringBuilder(); - for (HRegion r: this.onlineRegions.values()) { + for (Region r: this.onlineRegions.values()) { if (sb.length() > 0) sb.append(", "); sb.append(r.getRegionInfo().getEncodedName()); } @@ -1222,7 +1220,7 @@ public class HRegionServer extends HasThread implements // Ensure all user regions have been sent a close. Use this to // protect against the case where an open comes in after we start the // iterator of onlineRegions to close all user regions. - for (Map.Entry e : this.onlineRegions.entrySet()) { + for (Map.Entry e : this.onlineRegions.entrySet()) { HRegionInfo hri = e.getValue().getRegionInfo(); if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && !closedRegions.contains(hri.getEncodedName())) { @@ -1378,44 +1376,37 @@ public class HRegionServer extends HasThread implements * * @throws IOException */ - private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, + private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr, RegionSpecifier.Builder regionSpecifier) throws IOException { - byte[] name = r.getRegionName(); + byte[] name = r.getRegionInfo().getRegionName(); int stores = 0; int storefiles = 0; int storeUncompressedSizeMB = 0; int storefileSizeMB = 0; - int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024); + int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024); int storefileIndexSizeMB = 0; int rootIndexSizeKB = 0; int totalStaticIndexSizeKB = 0; int totalStaticBloomSizeKB = 0; long totalCompactingKVs = 0; long currentCompactedKVs = 0; - synchronized (r.stores) { - stores += r.stores.size(); - for (Store store : r.stores.values()) { - storefiles += store.getStorefilesCount(); - storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() - / 1024 / 1024); - storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); - storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024); - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null) { - totalCompactingKVs += progress.totalCompactingKVs; - currentCompactedKVs += progress.currentCompactedKVs; - } - - rootIndexSizeKB += - (int) (store.getStorefilesIndexSize() / 1024); - - totalStaticIndexSizeKB += - (int) (store.getTotalStaticIndexSize() / 1024); - - totalStaticBloomSizeKB += - (int) (store.getTotalStaticBloomSize() / 1024); - } + List storeList = r.getStores(); + stores += storeList.size(); + for (Store store : storeList) { + storefiles += store.getStorefilesCount(); + storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); + storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); + storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024); + CompactionProgress progress = store.getCompactionProgress(); + if (progress != null) { + totalCompactingKVs += progress.totalCompactingKVs; + currentCompactedKVs += progress.currentCompactedKVs; + } + rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024); + totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024); + totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024); } + float dataLocality = r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname()); if (regionLoadBldr == null) { @@ -1426,8 +1417,7 @@ public class HRegionServer extends HasThread implements } regionSpecifier.setType(RegionSpecifierType.REGION_NAME); regionSpecifier.setValue(ByteStringer.wrap(name)); - r.setCompleteSequenceId(regionLoadBldr) - .setRegionSpecifier(regionSpecifier.build()) + regionLoadBldr.setRegionSpecifier(regionSpecifier.build()) .setStores(stores) .setStorefiles(storefiles) .setStoreUncompressedSizeMB(storeUncompressedSizeMB) @@ -1437,12 +1427,13 @@ public class HRegionServer extends HasThread implements .setRootIndexSizeKB(rootIndexSizeKB) .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) - .setReadRequestsCount(r.readRequestsCount.get()) - .setWriteRequestsCount(r.writeRequestsCount.get()) + .setReadRequestsCount(r.getReadRequestsCount()) + .setWriteRequestsCount(r.getWriteRequestsCount()) .setTotalCompactingKVs(totalCompactingKVs) .setCurrentCompactedKVs(currentCompactedKVs) .setDataLocality(dataLocality) .setLastMajorCompactionTs(r.getOldestHfileTs(true)); + ((HRegion)r).setCompleteSequenceId(regionLoadBldr); return regionLoadBldr.build(); } @@ -1452,8 +1443,7 @@ public class HRegionServer extends HasThread implements * @return An instance of RegionLoad. */ public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { - HRegion r = null; - r = this.onlineRegions.get(encodedRegionName); + Region r = onlineRegions.get(encodedRegionName); return r != null ? createRegionLoad(r, null, null) : null; } @@ -1482,10 +1472,10 @@ public class HRegionServer extends HasThread implements @Override protected void chore() { - for (HRegion r : this.instance.onlineRegions.values()) { + for (Region r : this.instance.onlineRegions.values()) { if (r == null) continue; - for (Store s : r.getStores().values()) { + for (Store s : r.getStores()) { try { long multiplier = s.getCompactionCheckMultiplier(); assert multiplier > 0; @@ -1496,7 +1486,7 @@ public class HRegionServer extends HasThread implements + " requests compaction"); } else if (s.isMajorCompaction()) { if (majorCompactPriority == DEFAULT_PRIORITY - || majorCompactPriority > r.getCompactPriority()) { + || majorCompactPriority > ((HRegion)r).getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); } else { @@ -1525,15 +1515,15 @@ public class HRegionServer extends HasThread implements @Override protected void chore() { - for (HRegion r : this.server.onlineRegions.values()) { + for (Region r : this.server.onlineRegions.values()) { if (r == null) continue; - if (r.shouldFlush()) { + if (((HRegion)r).shouldFlush()) { FlushRequester requester = server.getFlushRequester(); if (requester != null) { long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME; - LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() + - " after a delay of " + randomDelay); + LOG.info(getName() + " requesting flush for region " + + r.getRegionInfo().getRegionNameAsString() + " after a delay of " + randomDelay); //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. @@ -1839,12 +1829,12 @@ public class HRegionServer extends HasThread implements } @Override - public void postOpenDeployTasks(final HRegion r) - throws KeeperException, IOException { + public void postOpenDeployTasks(final Region r) throws KeeperException, IOException { + Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion"); rpcServices.checkOpen(); - LOG.info("Post open deploy tasks for " + r.getRegionNameAsString()); + LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString()); // Do checks to see if we need to compact (references or too many files) - for (Store s : r.getStores().values()) { + for (Store s : r.getStores()) { if (s.hasReferences() || s.needsCompaction()) { this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); } @@ -1852,7 +1842,8 @@ public class HRegionServer extends HasThread implements long openSeqNum = r.getOpenSeqNum(); if (openSeqNum == HConstants.NO_SEQNUM) { // If we opened a region, we should have read some sequence number from it. - LOG.error("No sequence number found when opening " + r.getRegionNameAsString()); + LOG.error("No sequence number found when opening " + + r.getRegionInfo().getRegionNameAsString()); openSeqNum = 0; } @@ -1863,12 +1854,12 @@ public class HRegionServer extends HasThread implements if (!reportRegionStateTransition( TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) { throw new IOException("Failed to report opened region to master: " - + r.getRegionNameAsString()); + + r.getRegionInfo().getRegionNameAsString()); } - triggerFlushInPrimaryRegion(r); + triggerFlushInPrimaryRegion((HRegion)r); - LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString()); + LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); } @Override @@ -2272,10 +2263,10 @@ public class HRegionServer extends HasThread implements * @param abort Whether we're running an abort. */ void closeMetaTableRegions(final boolean abort) { - HRegion meta = null; + Region meta = null; this.lock.writeLock().lock(); try { - for (Map.Entry e: onlineRegions.entrySet()) { + for (Map.Entry e: onlineRegions.entrySet()) { HRegionInfo hri = e.getValue().getRegionInfo(); if (hri.isMetaRegion()) { meta = e.getValue(); @@ -2297,8 +2288,8 @@ public class HRegionServer extends HasThread implements void closeUserRegions(final boolean abort) { this.lock.writeLock().lock(); try { - for (Map.Entry e: this.onlineRegions.entrySet()) { - HRegion r = e.getValue(); + for (Map.Entry e: this.onlineRegions.entrySet()) { + Region r = e.getValue(); if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) { // Don't update zk with this close transition; pass false. closeRegionIgnoreErrors(r.getRegionInfo(), abort); @@ -2328,7 +2319,7 @@ public class HRegionServer extends HasThread implements } @Override - public Map getRecoveringRegions() { + public Map getRecoveringRegions() { return this.recoveringRegions; } @@ -2359,13 +2350,13 @@ public class HRegionServer extends HasThread implements * This method will only work if HRegionServer is in the same JVM as client; * HRegion cannot be serialized to cross an rpc. */ - public Collection getOnlineRegionsLocalContext() { - Collection regions = this.onlineRegions.values(); + public Collection getOnlineRegionsLocalContext() { + Collection regions = this.onlineRegions.values(); return Collections.unmodifiableCollection(regions); } @Override - public void addToOnlineRegions(HRegion region) { + public void addToOnlineRegions(Region region) { this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); configurationManager.registerObserver(region); } @@ -2375,9 +2366,9 @@ public class HRegionServer extends HasThread implements * biggest. If two regions are the same size, then the last one found wins; i.e. this method * may NOT return all regions. */ - SortedMap getCopyOfOnlineRegionsSortedBySize() { + SortedMap getCopyOfOnlineRegionsSortedBySize() { // we'll sort the regions in reverse - SortedMap sortedRegions = new TreeMap( + SortedMap sortedRegions = new TreeMap( new Comparator() { @Override public int compare(Long a, Long b) { @@ -2385,8 +2376,8 @@ public class HRegionServer extends HasThread implements } }); // Copy over all regions. Regions are sorted by size with biggest first. - for (HRegion region : this.onlineRegions.values()) { - sortedRegions.put(region.memstoreSize.get(), region); + for (Region region : this.onlineRegions.values()) { + sortedRegions.put(region.getMemstoreSize(), region); } return sortedRegions; } @@ -2412,7 +2403,7 @@ public class HRegionServer extends HasThread implements */ protected HRegionInfo[] getMostLoadedRegions() { ArrayList regions = new ArrayList(); - for (HRegion r : onlineRegions.values()) { + for (Region r : onlineRegions.values()) { if (!r.isAvailable()) { continue; } @@ -2608,10 +2599,10 @@ public class HRegionServer extends HasThread implements * @return Online regions from tableName */ @Override - public List getOnlineRegions(TableName tableName) { - List tableRegions = new ArrayList(); + public List getOnlineRegions(TableName tableName) { + List tableRegions = new ArrayList(); synchronized (this.onlineRegions) { - for (HRegion region: this.onlineRegions.values()) { + for (Region region: this.onlineRegions.values()) { HRegionInfo regionInfo = region.getRegionInfo(); if(regionInfo.getTable().equals(tableName)) { tableRegions.add(region); @@ -2630,7 +2621,7 @@ public class HRegionServer extends HasThread implements public Set getOnlineTables() { Set tables = new HashSet(); synchronized (this.onlineRegions) { - for (HRegion region: this.onlineRegions.values()) { + for (Region region: this.onlineRegions.values()) { tables.add(region.getTableDesc().getTableName()); } } @@ -2647,8 +2638,8 @@ public class HRegionServer extends HasThread implements "skipping."); LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); } - Collection regions = getOnlineRegionsLocalContext(); - for (HRegion region: regions) { + Collection regions = getOnlineRegionsLocalContext(); + for (Region region: regions) { coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); try { coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); @@ -2698,7 +2689,7 @@ public class HRegionServer extends HasThread implements protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn) throws NotServingRegionException { //Check for permissions to close. - HRegion actualRegion = this.getFromOnlineRegions(encodedName); + Region actualRegion = this.getFromOnlineRegions(encodedName); if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { try { actualRegion.getCoprocessorHost().preClose(false); @@ -2759,7 +2750,7 @@ public class HRegionServer extends HasThread implements * @return HRegion for the passed binary regionName or null if * named region is not member of the online regions. */ - public HRegion getOnlineRegion(final byte[] regionName) { + public Region getOnlineRegion(final byte[] regionName) { String encodedRegionName = HRegionInfo.encodeRegionName(regionName); return this.onlineRegions.get(encodedRegionName); } @@ -2769,14 +2760,14 @@ public class HRegionServer extends HasThread implements } @Override - public HRegion getFromOnlineRegions(final String encodedRegionName) { + public Region getFromOnlineRegions(final String encodedRegionName) { return this.onlineRegions.get(encodedRegionName); } @Override - public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) { - HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); + public boolean removeFromOnlineRegions(final Region r, ServerName destination) { + Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); if (destination != null) { try { @@ -2808,20 +2799,20 @@ public class HRegionServer extends HasThread implements * @return {@link HRegion} for regionName * @throws NotServingRegionException */ - protected HRegion getRegion(final byte[] regionName) + protected Region getRegion(final byte[] regionName) throws NotServingRegionException { String encodedRegionName = HRegionInfo.encodeRegionName(regionName); return getRegionByEncodedName(regionName, encodedRegionName); } - public HRegion getRegionByEncodedName(String encodedRegionName) + public Region getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { return getRegionByEncodedName(null, encodedRegionName); } - protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) + protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName) throws NotServingRegionException { - HRegion region = this.onlineRegions.get(encodedRegionName); + Region region = this.onlineRegions.get(encodedRegionName); if (region == null) { MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); if (moveInfo != null) { @@ -3077,17 +3068,17 @@ public class HRegionServer extends HasThread implements * @throws KeeperException * @throws IOException */ - private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException, + private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException, IOException { if (!r.isRecovering()) { // return immdiately for non-recovering regions return; } - HRegionInfo region = r.getRegionInfo(); + HRegionInfo regionInfo = r.getRegionInfo(); ZooKeeperWatcher zkw = getZooKeeper(); - String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName()); - Map maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay(); + String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName()); + Map maxSeqIdInStores = r.getMaxStoreSeqId(); long minSeqIdForLogReplay = -1; for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) { if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) { @@ -3098,7 +3089,7 @@ public class HRegionServer extends HasThread implements try { long lastRecordedFlushedSequenceId = -1; String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, - region.getEncodedName()); + regionInfo.getEncodedName()); // recovering-region level byte[] data; try { @@ -3107,7 +3098,7 @@ public class HRegionServer extends HasThread implements throw new InterruptedIOException(); } if (data != null) { - lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data); + lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data); } if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) { ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay)); @@ -3117,14 +3108,14 @@ public class HRegionServer extends HasThread implements nodePath = ZKUtil.joinZNode(nodePath, previousRSName); ZKUtil.setData(zkw, nodePath, ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores)); - LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " - + previousRSName); + LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() + + " for " + previousRSName); } else { LOG.warn("Can't find failed region server for recovering region " + - region.getEncodedName()); + regionInfo.getEncodedName()); } } catch (NoNodeException ignore) { - LOG.debug("Region " + region.getEncodedName() + + LOG.debug("Region " + regionInfo.getEncodedName() + " must have completed recovery because its recovery znode has been removed", ignore); } } @@ -3241,8 +3232,8 @@ public class HRegionServer extends HasThread implements @Override public double getCompactionPressure() { double max = 0; - for (HRegion region : onlineRegions.values()) { - for (Store store : region.getStores().values()) { + for (Region region : onlineRegions.values()) { + for (Store store : region.getStores()) { double normCount = store.getCompactionPressure(); if (normCount > max) { max = normCount; http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 43deb58..a66a29c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -324,7 +324,7 @@ public class HeapMemoryManager { } @Override - public void flushRequested(FlushType type, HRegion region) { + public void flushRequested(FlushType type, Region region) { switch (type) { case ABOVE_HIGHER_MARK: blockedFlushCount.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java index 18bb376..d7a9be5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java @@ -72,7 +72,7 @@ extends ConstantSizeRegionSplitPolicy { // Get size to check long sizeToCheck = getSizeToCheck(tableRegionsCount); - for (Store store : region.getStores().values()) { + for (Store store : region.getStores()) { // If any of the stores is unable to split (eg they contain reference files) // then don't split if ((!store.canSplit())) { @@ -114,7 +114,7 @@ extends ConstantSizeRegionSplitPolicy { TableName tablename = this.region.getTableDesc().getTableName(); int tableRegionsCount = 0; try { - List hri = rss.getOnlineRegions(tablename); + List hri = rss.getOnlineRegions(tablename); tableRegionsCount = hri == null || hri.isEmpty()? 0: hri.size(); } catch (IOException e) { LOG.debug("Failed getOnlineRegions " + tablename, e); http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 71dea3b..de8ed8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -164,7 +164,7 @@ public class LogRoller extends HasThread { */ private void scheduleFlush(final byte [] encodedRegionName) { boolean scheduled = false; - HRegion r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName)); + Region r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName)); FlushRequester requester = null; if (r != null) { requester = this.services.getFlushRequester(); http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 8bee002..485d30f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -79,8 +80,8 @@ class MemStoreFlusher implements FlushRequester { // a corresponding entry in the other. private final BlockingQueue flushQueue = new DelayQueue(); - private final Map regionsInQueue = - new HashMap(); + private final Map regionsInQueue = + new HashMap(); private AtomicBoolean wakeupPending = new AtomicBoolean(); private final long threadWakeFrequency; @@ -139,10 +140,8 @@ class MemStoreFlusher implements FlushRequester { * @return true if successful */ private boolean flushOneForGlobalPressure() { - SortedMap regionsBySize = - server.getCopyOfOnlineRegionsSortedBySize(); - - Set excludedRegions = new HashSet(); + SortedMap regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); + Set excludedRegions = new HashSet(); double secondaryMultiplier = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); @@ -151,13 +150,12 @@ class MemStoreFlusher implements FlushRequester { while (!flushedOne) { // Find the biggest region that doesn't have too many storefiles // (might be null!) - HRegion bestFlushableRegion = getBiggestMemstoreRegion( - regionsBySize, excludedRegions, true); + Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true); // Find the biggest region, total, even if it might have too many flushes. - HRegion bestAnyRegion = getBiggestMemstoreRegion( + Region bestAnyRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, false); // Find the biggest region that is a secondary region - HRegion bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize, + Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize, excludedRegions); if (bestAnyRegion == null && bestRegionReplica == null) { @@ -165,19 +163,20 @@ class MemStoreFlusher implements FlushRequester { return false; } - HRegion regionToFlush; + Region regionToFlush; if (bestFlushableRegion != null && - bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { + bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) { // Even if it's not supposed to be flushed, pick a region if it's more than twice // as big as the best flushable one - otherwise when we're under pressure we make // lots of little flushes and cause lots of compactions, etc, which just makes // life worse! if (LOG.isDebugEnabled()) { LOG.debug("Under global heap pressure: " + "Region " - + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " - + TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1) + + bestAnyRegion.getRegionInfo().getRegionNameAsString() + + " has too many " + "store files, but is " + + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(), "", 1) + " vs best flushable region's " - + TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "", 1) + + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(), "", 1) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; @@ -190,14 +189,14 @@ class MemStoreFlusher implements FlushRequester { } Preconditions.checkState( - (regionToFlush != null && regionToFlush.memstoreSize.get() > 0) || - (bestRegionReplica != null && bestRegionReplica.memstoreSize.get() > 0)); + (regionToFlush != null && regionToFlush.getMemstoreSize() > 0) || + (bestRegionReplica != null && bestRegionReplica.getMemstoreSize() > 0)); if (regionToFlush == null || (bestRegionReplica != null && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && - (bestRegionReplica.memstoreSize.get() - > secondaryMultiplier * regionToFlush.memstoreSize.get()))) { + (bestRegionReplica.getMemstoreSize() + > secondaryMultiplier * regionToFlush.getMemstoreSize()))) { LOG.info("Refreshing storefiles of region " + regionToFlush + " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt( server.getRegionServerAccounting().getGlobalMemstoreSize())); @@ -212,7 +211,7 @@ class MemStoreFlusher implements FlushRequester { + "Total Memstore size=" + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + ", Region memstore size=" - + humanReadableInt(regionToFlush.memstoreSize.get())); + + humanReadableInt(regionToFlush.getMemstoreSize())); flushedOne = flushRegion(regionToFlush, true, true); if (!flushedOne) { @@ -289,17 +288,18 @@ class MemStoreFlusher implements FlushRequester { } } - private HRegion getBiggestMemstoreRegion( - SortedMap regionsBySize, - Set excludedRegions, + private Region getBiggestMemstoreRegion( + SortedMap regionsBySize, + Set excludedRegions, boolean checkStoreFileCount) { synchronized (regionsInQueue) { - for (HRegion region : regionsBySize.values()) { + for (Region region : regionsBySize.values()) { if (excludedRegions.contains(region)) { continue; } - if (region.writestate.flushing || !region.writestate.writesEnabled) { + if (((HRegion)region).writestate.flushing || + !((HRegion)region).writestate.writesEnabled) { continue; } @@ -312,10 +312,10 @@ class MemStoreFlusher implements FlushRequester { return null; } - private HRegion getBiggestMemstoreOfRegionReplica(SortedMap regionsBySize, - Set excludedRegions) { + private Region getBiggestMemstoreOfRegionReplica(SortedMap regionsBySize, + Set excludedRegions) { synchronized (regionsInQueue) { - for (HRegion region : regionsBySize.values()) { + for (Region region : regionsBySize.values()) { if (excludedRegions.contains(region)) { continue; } @@ -330,7 +330,7 @@ class MemStoreFlusher implements FlushRequester { return null; } - private boolean refreshStoreFilesAndReclaimMemory(HRegion region) { + private boolean refreshStoreFilesAndReclaimMemory(Region region) { try { return region.refreshStoreFiles(); } catch (IOException e) { @@ -356,7 +356,7 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestFlush(HRegion r, boolean forceFlushAllStores) { + public void requestFlush(Region r, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush @@ -369,7 +369,7 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { + public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay @@ -435,19 +435,19 @@ class MemStoreFlusher implements FlushRequester { * not flushed. */ private boolean flushRegion(final FlushRegionEntry fqe) { - HRegion region = fqe.region; + Region region = fqe.region; if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { if (fqe.isMaximumWait(this.blockingWaitTime)) { LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + "ms on a compaction to clean up 'too many store files'; waited " + "long enough... proceeding with flush of " + - region.getRegionNameAsString()); + region.getRegionInfo().getRegionNameAsString()); } else { // If this is first time we've been put off, then emit a log message. if (fqe.getRequeueCount() <= 0) { // Note: We don't impose blockingStoreFiles constraint on meta regions - LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + + LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { @@ -456,9 +456,8 @@ class MemStoreFlusher implements FlushRequester { } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException)e).unwrapRemoteException() : e; - LOG.error( - "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), - e); + LOG.error("Cache flush failed for region " + + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); } } } @@ -485,7 +484,7 @@ class MemStoreFlusher implements FlushRequester { * false, there will be accompanying log messages explaining why the region was * not flushed. */ - private boolean flushRegion(final HRegion region, final boolean emergencyFlush, + private boolean flushRegion(final Region region, final boolean emergencyFlush, boolean forceFlushAllStores) { long startTime = 0; synchronized (this.regionsInQueue) { @@ -509,10 +508,10 @@ class MemStoreFlusher implements FlushRequester { lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); - HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores); + FlushResult flushResult = region.flush(forceFlushAllStores); boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size - boolean shouldSplit = region.checkSplit() != null; + boolean shouldSplit = ((HRegion)region).checkSplit() != null; if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { @@ -535,8 +534,9 @@ class MemStoreFlusher implements FlushRequester { ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; LOG.error( "Cache flush failed" - + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) - : ""), ex); + + (region != null ? (" for region " + + Bytes.toStringBinary(region.getRegionInfo().getRegionName())) + : ""), ex); if (!server.checkFileSystem()) { return false; } @@ -547,7 +547,7 @@ class MemStoreFlusher implements FlushRequester { return true; } - private void notifyFlushRequest(HRegion region, boolean emergencyFlush) { + private void notifyFlushRequest(Region region, boolean emergencyFlush) { FlushType type = FlushType.NORMAL; if (emergencyFlush) { type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK; @@ -563,8 +563,8 @@ class MemStoreFlusher implements FlushRequester { } } - private boolean isTooManyStoreFiles(HRegion region) { - for (Store store : region.stores.values()) { + private boolean isTooManyStoreFiles(Region region) { + for (Store store : region.getStores()) { if (store.hasTooManyStoreFiles()) { return true; } @@ -719,7 +719,7 @@ class MemStoreFlusher implements FlushRequester { * a while. */ static class FlushRegionEntry implements FlushQueueEntry { - private final HRegion region; + private final Region region; private final long createTime; private long whenToExpire; @@ -727,7 +727,7 @@ class MemStoreFlusher implements FlushRequester { private boolean forceFlushAllStores; - FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) { + FlushRegionEntry(final Region r, boolean forceFlushAllStores) { this.region = r; this.createTime = EnvironmentEdgeManager.currentTime(); this.whenToExpire = this.createTime; @@ -789,7 +789,7 @@ class MemStoreFlusher implements FlushRequester { @Override public String toString() { - return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]"; + return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]"; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 5e5590d..3111661 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -166,7 +167,7 @@ class MetricsRegionServerWrapperImpl @Override public long getNumOnlineRegions() { - Collection onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext(); + Collection onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext(); if (onlineRegionsLocalContext == null) { return 0; } @@ -452,16 +453,17 @@ class MetricsRegionServerWrapperImpl long tempMajorCompactedCellsSize = 0; long tempBlockedRequestsCount = 0L; - for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { - tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); - tempDataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get(); - tempReadRequestsCount += r.readRequestsCount.get(); - tempWriteRequestsCount += r.writeRequestsCount.get(); - tempCheckAndMutateChecksFailed += r.checkAndMutateChecksFailed.get(); - tempCheckAndMutateChecksPassed += r.checkAndMutateChecksPassed.get(); + for (Region r : regionServer.getOnlineRegionsLocalContext()) { + tempNumMutationsWithoutWAL += r.getNumMutationsWithoutWAL(); + tempDataInMemoryWithoutWAL += r.getDataInMemoryWithoutWAL(); + tempReadRequestsCount += r.getReadRequestsCount(); + tempWriteRequestsCount += r.getWriteRequestsCount(); + tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed(); + tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed(); tempBlockedRequestsCount += r.getBlockedRequestsCount(); - tempNumStores += r.stores.size(); - for (Store store : r.stores.values()) { + List storeList = r.getStores(); + tempNumStores += storeList.size(); + for (Store store : storeList) { tempNumStoreFiles += store.getStorefilesCount(); tempMemstoreSize += store.getMemStoreSize(); tempStoreFileSize += store.getStorefilesSize(); http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java index 1cde7e3..60fc9fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -22,46 +22,49 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; /** * Interface to Map of online regions. In the Map, the key is the region's - * encoded name and the value is an {@link HRegion} instance. + * encoded name and the value is an {@link Region} instance. */ -@InterfaceAudience.Private -interface OnlineRegions extends Server { +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface OnlineRegions extends Server { /** * Add to online regions. * @param r */ - void addToOnlineRegions(final HRegion r); + void addToOnlineRegions(final Region r); /** - * This method removes HRegion corresponding to hri from the Map of onlineRegions. + * This method removes Region corresponding to hri from the Map of onlineRegions. * * @param r Region to remove. * @param destination Destination, if any, null otherwise. * @return True if we removed a region from online list. */ - boolean removeFromOnlineRegions(final HRegion r, ServerName destination); + boolean removeFromOnlineRegions(final Region r, ServerName destination); /** - * Return {@link HRegion} instance. - * Only works if caller is in same context, in same JVM. HRegion is not + * Return {@link Region} instance. + * Only works if caller is in same context, in same JVM. Region is not * serializable. * @param encodedRegionName - * @return HRegion for the passed encoded encodedRegionName or + * @return Region for the passed encoded encodedRegionName or * null if named region is not member of the online regions. */ - HRegion getFromOnlineRegions(String encodedRegionName); + Region getFromOnlineRegions(String encodedRegionName); /** * Get all online regions of a table in this RS. * @param tableName - * @return List of HRegion + * @return List of Region * @throws java.io.IOException */ - List getOnlineRegions(TableName tableName) throws IOException; + List getOnlineRegions(TableName tableName) throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/f1f4b661/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 62921ee..b0fd9eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -151,9 +151,10 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; -import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -163,6 +164,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; @@ -215,9 +217,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private static class RegionScannerHolder { private RegionScanner s; private long nextCallSeq = 0L; - private HRegion r; + private Region r; - public RegionScannerHolder(RegionScanner s, HRegion r) { + public RegionScannerHolder(RegionScanner s, Region r) { this.s = s; this.r = r; } @@ -242,7 +244,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { - HRegion region = regionServer.getRegion(s.getRegionInfo().getRegionName()); + Region region = regionServer.getRegion(s.getRegionInfo().getRegionName()); if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().preScannerClose(s); } @@ -362,7 +364,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. * @throws IOException */ - private ClientProtos.RegionLoadStats mutateRows(final HRegion region, + private ClientProtos.RegionLoadStats mutateRows(final Region region, final List actions, final CellScanner cellScanner) throws IOException { if (!region.getRegionInfo().isMetaTable()) { @@ -390,7 +392,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } region.mutateRow(rm); - return region.getRegionStats(); + return ((HRegion)region).getRegionStats(); } /** @@ -405,7 +407,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param compareOp * @param comparator @throws IOException */ - private boolean checkAndRowMutate(final HRegion region, final List actions, + private boolean checkAndRowMutate(final Region region, final List actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator) throws IOException { if (!region.getRegionInfo().isMetaTable()) { @@ -445,7 +447,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * bypassed as indicated by RegionObserver, null otherwise * @throws IOException */ - private Result append(final HRegion region, final OperationQuota quota, final MutationProto m, + private Result append(final Region region, final OperationQuota quota, final MutationProto m, final CellScanner cellScanner, long nonceGroup) throws IOException { long before = EnvironmentEdgeManager.currentTime(); Append append = ProtobufUtil.toAppend(m, cellScanner); @@ -482,7 +484,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @return the Result * @throws IOException */ - private Result increment(final HRegion region, final OperationQuota quota, + private Result increment(final Region region, final OperationQuota quota, final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException { long before = EnvironmentEdgeManager.currentTime(); @@ -523,7 +525,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * method returns as a 'result'. * @return Return the cellScanner passed */ - private List doNonAtomicRegionMutation(final HRegion region, + private List doNonAtomicRegionMutation(final Region region, final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, final RegionActionResult.Builder builder, List cellsToReturn, long nonceGroup) { // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do @@ -622,7 +624,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param region * @param mutations */ - private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, + private void doBatchOp(final RegionActionResult.Builder builder, final Region region, final OperationQuota quota, final List mutations, final CellScanner cells) { Mutation[] mArray = new Mutation[mutations.size()]; @@ -648,7 +650,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus codes[] = region.batchMutate(mArray); + OperationStatus codes[] = region.batchMutate(mArray, HConstants.NO_NONCE, + HConstants.NO_NONCE); for (i = 0; i < codes.length; i++) { int index = mutations.get(i).getIndex(); Exception e = null; @@ -670,7 +673,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, case SUCCESS: builder.addResultOrException(getResultOrException( - ClientProtos.Result.getDefaultInstance(), index, region.getRegionStats())); + ClientProtos.Result.getDefaultInstance(), index, + ((HRegion)region).getRegionStats())); break; } } @@ -700,7 +704,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * exceptionMessage if any * @throws IOException */ - private OperationStatus [] doReplayBatchOp(final HRegion region, + private OperationStatus [] doReplayBatchOp(final Region region, final List mutations, long replaySeqId) throws IOException { long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -720,26 +724,27 @@ public class RSRpcServices implements HBaseRPCErrorHandler, for (Cell metaCell : metaCells) { CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); + HRegion hRegion = (HRegion)region; if (compactionDesc != null) { // replay the compaction. Remove the files from stores only if we are the primary // region replica (thus own the files) - region.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, + hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, replaySeqId); continue; } FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); if (flushDesc != null && !isDefaultReplica) { - region.replayWALFlushMarker(flushDesc, replaySeqId); + hRegion.replayWALFlushMarker(flushDesc, replaySeqId); continue; } RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); if (regionEvent != null && !isDefaultReplica) { - region.replayWALRegionEventMarker(regionEvent); + hRegion.replayWALRegionEventMarker(regionEvent); continue; } BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); if (bulkLoadEvent != null) { - region.replayWALBulkLoadEventMarker(bulkLoadEvent); + hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); continue; } } @@ -852,7 +857,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return 0L; } - long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException { + long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException { long scannerId = this.scannerIdGen.incrementAndGet(); String scannerName = String.valueOf(scannerId); @@ -873,7 +878,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws IOException if the specifier is not null, * but failed to find the region */ - HRegion getRegion( + Region getRegion( final RegionSpecifier regionSpecifier) throws IOException { return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(), ProtobufUtil.getRegionEncodedName(regionSpecifier)); @@ -1006,7 +1011,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); // Can be null if we're calling close on a region that's not online - final HRegion region = regionServer.getFromOnlineRegions(encodedRegionName); + final Region region = regionServer.getFromOnlineRegions(encodedRegionName); if ((region != null) && (region .getCoprocessorHost() != null)) { region.getCoprocessorHost().preClose(false); } @@ -1035,9 +1040,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); region.startRegionOperation(Operation.COMPACT_REGION); - LOG.info("Compacting " + region.getRegionNameAsString()); + LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); boolean major = false; byte [] family = null; Store store = null; @@ -1046,7 +1051,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, store = region.getStore(family); if (store == null) { throw new ServiceException(new IOException("column family " + Bytes.toString(family) - + " does not exist in region " + region.getRegionNameAsString())); + + " does not exist in region " + region.getRegionInfo().getRegionNameAsString())); } } if (request.hasMajor()) { @@ -1063,7 +1068,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):""; if (LOG.isTraceEnabled()) { LOG.trace("User-triggered compaction requested for region " - + region.getRegionNameAsString() + familyLogMsg); + + region.getRegionInfo().getRegionNameAsString() + familyLogMsg); } String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; if(family != null) { @@ -1093,8 +1098,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); - LOG.info("Flushing " + region.getRegionNameAsString()); + Region region = getRegion(request.getRegion()); + LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); boolean shouldFlush = true; if (request.hasIfOlderThanTs()) { shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); @@ -1104,7 +1109,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; long startTime = EnvironmentEdgeManager.currentTime(); - HRegion.FlushResult flushResult = region.flushcache(true, writeFlushWalMarker); + // Go behind the curtain so we can manage writing of the flush WAL marker + HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl) + ((HRegion)region).flushcache(true, writeFlushWalMarker); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); @@ -1117,7 +1124,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.setFlushed(flushResult.isFlushSucceeded()); builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); } - builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores()); + builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); return builder.build(); } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical @@ -1138,9 +1145,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - Map onlineRegions = regionServer.onlineRegions; + Map onlineRegions = regionServer.onlineRegions; List list = new ArrayList(onlineRegions.size()); - for (HRegion region: onlineRegions.values()) { + for (Region region: onlineRegions.values()) { list.add(region.getRegionInfo()); } Collections.sort(list); @@ -1157,7 +1164,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); HRegionInfo info = region.getRegionInfo(); GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); builder.setRegionInfo(HRegionInfo.convert(info)); @@ -1198,11 +1205,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final GetStoreFileRequest request) throws ServiceException { try { checkOpen(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); requestCount.increment(); Set columnFamilies; if (request.getFamilyCount() == 0) { - columnFamilies = region.getStores().keySet(); + columnFamilies = region.getTableDesc().getFamiliesKeys(); } else { columnFamilies = new TreeSet(Bytes.BYTES_RAWCOMPARATOR); for (ByteString cf: request.getFamilyList()) { @@ -1235,8 +1242,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion regionA = getRegion(request.getRegionA()); - HRegion regionB = getRegion(request.getRegionB()); + Region regionA = getRegion(request.getRegionA()); + Region regionB = getRegion(request.getRegionB()); boolean forcible = request.getForcible(); regionA.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION); @@ -1247,13 +1254,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, LOG.info("Receiving merging request for " + regionA + ", " + regionB + ",forcible=" + forcible); long startTime = EnvironmentEdgeManager.currentTime(); - HRegion.FlushResult flushResult = regionA.flushcache(); + FlushResult flushResult = regionA.flush(true); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } startTime = EnvironmentEdgeManager.currentTime(); - flushResult = regionB.flushcache(); + flushResult = regionB.flush(true); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); @@ -1346,7 +1353,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { String encodedName = region.getEncodedName(); byte[] encodedNameBytes = region.getEncodedNameAsBytes(); - final HRegion onlineRegion = regionServer.getFromOnlineRegions(encodedName); + final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName); if (onlineRegion != null) { // The region is already online. This should not happen any more. String error = "Received OPEN for the region:" @@ -1456,7 +1463,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { String encodedName = region.getEncodedName(); byte[] encodedNameBytes = region.getEncodedNameAsBytes(); - final HRegion onlineRegion = regionServer.getFromOnlineRegions(encodedName); + final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName); if (onlineRegion != null) { LOG.info("Region already online. Skipping warming up " + region); @@ -1507,7 +1514,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return ReplicateWALEntryResponse.newBuilder().build(); } ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); - HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); + Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); RegionCoprocessorHost coprocessorHost = ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) ? region.getCoprocessorHost() @@ -1558,12 +1565,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } //sync wal at the end because ASYNC_WAL is used above - region.syncWal(); + WAL wal = getWAL(region); + if (wal != null) { + wal.sync(); + } if (coprocessorHost != null) { - for (Pair wal : walEntries) { - coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), - wal.getSecond()); + for (Pair entry : walEntries) { + coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), + entry.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); @@ -1577,6 +1587,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + WAL getWAL(Region region) { + return ((HRegion)region).getWAL(); + } + /** * Replicate WAL entries on the region server. * @@ -1640,15 +1654,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); region.startRegionOperation(Operation.SPLIT_REGION); if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { throw new IOException("Can't split replicas directly. " + "Replicas are auto-split when their primary is split."); } - LOG.info("Splitting " + region.getRegionNameAsString()); + LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString()); long startTime = EnvironmentEdgeManager.currentTime(); - HRegion.FlushResult flushResult = region.flushcache(); + FlushResult flushResult = region.flush(true); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); @@ -1657,8 +1671,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (request.hasSplitPoint()) { splitPoint = request.getSplitPoint().toByteArray(); } - region.forceSplit(splitPoint); - regionServer.compactSplitThread.requestSplit(region, region.checkSplit()); + ((HRegion)region).forceSplit(splitPoint); + regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit()); return SplitRegionResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); @@ -1707,7 +1721,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); List> familyPaths = new ArrayList>(); for (FamilyPath familyPath: request.getFamilyPathList()) { familyPaths.add(new Pair(familyPath.getFamily().toByteArray(), @@ -1719,7 +1733,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } boolean loaded = false; if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum()); + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); } if (region.getCoprocessorHost() != null) { loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); @@ -1738,12 +1752,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); Message result = execServiceOnRegion(region, request.getCall()); CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); builder.setRegion(RequestConverter.buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, region.getRegionName())); + RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName())); builder.setValue( builder.getValueBuilder().setName(result.getClass().getName()) .setValue(result.toByteString())); @@ -1753,7 +1767,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - private Message execServiceOnRegion(HRegion region, + private Message execServiceOnRegion(Region region, final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { // ignore the passed in controller (from the serialized call) ServerRpcController execController = new ServerRpcController(); @@ -1779,7 +1793,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); GetResponse.Builder builder = GetResponse.newBuilder(); ClientProtos.Get get = request.getGet(); @@ -1871,7 +1885,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, for (RegionAction regionAction : request.getRegionActionList()) { this.requestCount.add(regionAction.getActionCount()); OperationQuota quota; - HRegion region; + Region region; regionActionResultBuilder.clear(); try { region = getRegion(regionAction.getRegion()); @@ -1946,14 +1960,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - HRegion region = getRegion(request.getRegion()); + Region region = getRegion(request.getRegion()); MutateResponse.Builder builder = MutateResponse.newBuilder(); MutationProto mutation = request.getMutation(); if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } - long nonceGroup = request.hasNonceGroup() - ? request.getNonceGroup() : HConstants.NO_NONCE; + long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; Result r = null; Boolean processed = null; MutationType type = mutation.getMutateType(); @@ -2090,7 +2103,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, requestCount.increment(); int ttl = 0; - HRegion region = null; + Region region = null; RegionScanner scanner = null; RegionScannerHolder rsh = null; boolean moreResults = true; @@ -2129,7 +2142,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } isSmallScan = scan.isSmall(); - region.prepareScanner(scan); + if (!scan.hasFamilies()) { + // Adding all families to scanner + for (byte[] family: region.getTableDesc().getFamiliesKeys()) { + scan.addFamily(family); + } + } + if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().preScannerOpen(scan); } @@ -2273,7 +2292,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.setMoreResultsInRegion(false); } } - region.readRequestsCount.add(i); + region.updateReadRequestsCount(i); region.getMetrics().updateScanNext(totalCellSize); } finally { region.closeRegionOperation();