From commits-return-68085-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Sat Feb 17 16:13:43 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BF19C18077F for ; Sat, 17 Feb 2018 16:13:41 +0100 (CET) Received: (qmail 52110 invoked by uid 500); 17 Feb 2018 15:13:37 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 51666 invoked by uid 99); 17 Feb 2018 15:13:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Feb 2018 15:13:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82FC8F32D9; Sat, 17 Feb 2018 15:13:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sat, 17 Feb 2018 15:13:55 -0000 Message-Id: In-Reply-To: <91eb4392ad0a4cd8902c93db710eb9e5@git.apache.org> References: <91eb4392ad0a4cd8902c93db710eb9e5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/51] [partial] hbase-site git commit: Published site at . http://git-wip-us.apache.org/repos/asf/hbase-site/blob/193b4259/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.Visitor.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.Visitor.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.Visitor.html index bd13b53..802b925 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.Visitor.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.Visitor.html @@ -900,7600 +900,7598 @@ 892 if (this.getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { 893 status.setStatus("Writing region info on filesystem"); 894 fs.checkRegionInfoOnFilesystem(); -895 } else { -896 if (LOG.isDebugEnabled()) { -897 LOG.debug("Skipping creation of .regioninfo file for " + this.getRegionInfo()); -898 } -899 } -900 -901 // Initialize all the HStores -902 status.setStatus("Initializing all the Stores"); -903 long maxSeqId = initializeStores(reporter, status); -904 this.mvcc.advanceTo(maxSeqId); -905 if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { -906 Collection<HStore> stores = this.stores.values(); -907 try { -908 // update the stores that we are replaying -909 stores.forEach(HStore::startReplayingFromWAL); -910 // Recover any edits if available. -911 maxSeqId = Math.max(maxSeqId, -912 replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); -913 // Make sure mvcc is up to max. -914 this.mvcc.advanceTo(maxSeqId); -915 } finally { -916 // update the stores that we are done replaying -917 stores.forEach(HStore::stopReplayingFromWAL); -918 } -919 } -920 this.lastReplayedOpenRegionSeqId = maxSeqId; +895 } +896 +897 // Initialize all the HStores +898 status.setStatus("Initializing all the Stores"); +899 long maxSeqId = initializeStores(reporter, status); +900 this.mvcc.advanceTo(maxSeqId); +901 if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { +902 Collection<HStore> stores = this.stores.values(); +903 try { +904 // update the stores that we are replaying +905 stores.forEach(HStore::startReplayingFromWAL); +906 // Recover any edits if available. +907 maxSeqId = Math.max(maxSeqId, +908 replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); +909 // Make sure mvcc is up to max. +910 this.mvcc.advanceTo(maxSeqId); +911 } finally { +912 // update the stores that we are done replaying +913 stores.forEach(HStore::stopReplayingFromWAL); +914 } +915 } +916 this.lastReplayedOpenRegionSeqId = maxSeqId; +917 +918 this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); +919 this.writestate.flushRequested = false; +920 this.writestate.compacting.set(0); 921 -922 this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); -923 this.writestate.flushRequested = false; -924 this.writestate.compacting.set(0); -925 -926 if (this.writestate.writesEnabled) { -927 // Remove temporary data left over from old regions -928 status.setStatus("Cleaning up temporary data from old regions"); -929 fs.cleanupTempDir(); -930 } -931 -932 if (this.writestate.writesEnabled) { -933 status.setStatus("Cleaning up detritus from prior splits"); -934 // Get rid of any splits or merges that were lost in-progress. Clean out -935 // these directories here on open. We may be opening a region that was -936 // being split but we crashed in the middle of it all. -937 fs.cleanupAnySplitDetritus(); -938 fs.cleanupMergesDir(); -939 } -940 -941 // Initialize split policy -942 this.splitPolicy = RegionSplitPolicy.create(this, conf); -943 -944 // Initialize flush policy -945 this.flushPolicy = FlushPolicyFactory.create(this, conf); -946 -947 long lastFlushTime = EnvironmentEdgeManager.currentTime(); -948 for (HStore store: stores.values()) { -949 this.lastStoreFlushTimeMap.put(store, lastFlushTime); -950 } -951 -952 // Use maximum of log sequenceid or that which was found in stores -953 // (particularly if no recovered edits, seqid will be -1). -954 long nextSeqid = maxSeqId; -955 if (this.writestate.writesEnabled) { -956 nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), -957 this.fs.getRegionDir(), nextSeqid, 1); -958 } else { -959 nextSeqid++; -960 } -961 -962 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + -963 "; next sequenceid=" + nextSeqid); +922 if (this.writestate.writesEnabled) { +923 // Remove temporary data left over from old regions +924 status.setStatus("Cleaning up temporary data from old regions"); +925 fs.cleanupTempDir(); +926 } +927 +928 if (this.writestate.writesEnabled) { +929 status.setStatus("Cleaning up detritus from prior splits"); +930 // Get rid of any splits or merges that were lost in-progress. Clean out +931 // these directories here on open. We may be opening a region that was +932 // being split but we crashed in the middle of it all. +933 fs.cleanupAnySplitDetritus(); +934 fs.cleanupMergesDir(); +935 } +936 +937 // Initialize split policy +938 this.splitPolicy = RegionSplitPolicy.create(this, conf); +939 +940 // Initialize flush policy +941 this.flushPolicy = FlushPolicyFactory.create(this, conf); +942 +943 long lastFlushTime = EnvironmentEdgeManager.currentTime(); +944 for (HStore store: stores.values()) { +945 this.lastStoreFlushTimeMap.put(store, lastFlushTime); +946 } +947 +948 // Use maximum of log sequenceid or that which was found in stores +949 // (particularly if no recovered edits, seqid will be -1). +950 long nextSeqid = maxSeqId; +951 if (this.writestate.writesEnabled) { +952 nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), +953 this.fs.getRegionDir(), nextSeqid, 1); +954 } else { +955 nextSeqid++; +956 } +957 +958 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + +959 "; next sequenceid=" + nextSeqid); +960 +961 // A region can be reopened if failed a split; reset flags +962 this.closing.set(false); +963 this.closed.set(false); 964 -965 // A region can be reopened if failed a split; reset flags -966 this.closing.set(false); -967 this.closed.set(false); -968 -969 if (coprocessorHost != null) { -970 status.setStatus("Running coprocessor post-open hooks"); -971 coprocessorHost.postOpen(); -972 } +965 if (coprocessorHost != null) { +966 status.setStatus("Running coprocessor post-open hooks"); +967 coprocessorHost.postOpen(); +968 } +969 +970 status.markComplete("Region opened successfully"); +971 return nextSeqid; +972 } 973 -974 status.markComplete("Region opened successfully"); -975 return nextSeqid; -976 } -977 -978 /** -979 * Open all Stores. -980 * @param reporter -981 * @param status -982 * @return Highest sequenceId found out in a Store. -983 * @throws IOException -984 */ -985 private long initializeStores(CancelableProgressable reporter, MonitoredTask status) -986 throws IOException { -987 // Load in all the HStores. -988 long maxSeqId = -1; -989 // initialized to -1 so that we pick up MemstoreTS from column families -990 long maxMemstoreTS = -1; -991 -992 if (htableDescriptor.getColumnFamilyCount() != 0) { -993 // initialize the thread pool for opening stores in parallel. -994 ThreadPoolExecutor storeOpenerThreadPool = -995 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog()); -996 CompletionService<HStore> completionService = new ExecutorCompletionService<>(storeOpenerThreadPool); -997 -998 // initialize each store in parallel -999 for (final ColumnFamilyDescriptor family : htableDescriptor.getColumnFamilies()) { -1000 status.setStatus("Instantiating store for column family " + family); -1001 completionService.submit(new Callable<HStore>() { -1002 @Override -1003 public HStore call() throws IOException { -1004 return instantiateHStore(family); -1005 } -1006 }); -1007 } -1008 boolean allStoresOpened = false; -1009 boolean hasSloppyStores = false; -1010 try { -1011 for (int i = 0; i < htableDescriptor.getColumnFamilyCount(); i++) { -1012 Future<HStore> future = completionService.take(); -1013 HStore store = future.get(); -1014 this.stores.put(store.getColumnFamilyDescriptor().getName(), store); -1015 if (store.isSloppyMemStore()) { -1016 hasSloppyStores = true; -1017 } -1018 -1019 long storeMaxSequenceId = store.getMaxSequenceId().orElse(0L); -1020 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), -1021 storeMaxSequenceId); -1022 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) { -1023 maxSeqId = storeMaxSequenceId; +974 /** +975 * Open all Stores. +976 * @param reporter +977 * @param status +978 * @return Highest sequenceId found out in a Store. +979 * @throws IOException +980 */ +981 private long initializeStores(CancelableProgressable reporter, MonitoredTask status) +982 throws IOException { +983 // Load in all the HStores. +984 long maxSeqId = -1; +985 // initialized to -1 so that we pick up MemstoreTS from column families +986 long maxMemstoreTS = -1; +987 +988 if (htableDescriptor.getColumnFamilyCount() != 0) { +989 // initialize the thread pool for opening stores in parallel. +990 ThreadPoolExecutor storeOpenerThreadPool = +991 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog()); +992 CompletionService<HStore> completionService = new ExecutorCompletionService<>(storeOpenerThreadPool); +993 +994 // initialize each store in parallel +995 for (final ColumnFamilyDescriptor family : htableDescriptor.getColumnFamilies()) { +996 status.setStatus("Instantiating store for column family " + family); +997 completionService.submit(new Callable<HStore>() { +998 @Override +999 public HStore call() throws IOException { +1000 return instantiateHStore(family); +1001 } +1002 }); +1003 } +1004 boolean allStoresOpened = false; +1005 boolean hasSloppyStores = false; +1006 try { +1007 for (int i = 0; i < htableDescriptor.getColumnFamilyCount(); i++) { +1008 Future<HStore> future = completionService.take(); +1009 HStore store = future.get(); +1010 this.stores.put(store.getColumnFamilyDescriptor().getName(), store); +1011 if (store.isSloppyMemStore()) { +1012 hasSloppyStores = true; +1013 } +1014 +1015 long storeMaxSequenceId = store.getMaxSequenceId().orElse(0L); +1016 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), +1017 storeMaxSequenceId); +1018 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) { +1019 maxSeqId = storeMaxSequenceId; +1020 } +1021 long maxStoreMemstoreTS = store.getMaxMemStoreTS().orElse(0L); +1022 if (maxStoreMemstoreTS > maxMemstoreTS) { +1023 maxMemstoreTS = maxStoreMemstoreTS; 1024 } -1025 long maxStoreMemstoreTS = store.getMaxMemStoreTS().orElse(0L); -1026 if (maxStoreMemstoreTS > maxMemstoreTS) { -1027 maxMemstoreTS = maxStoreMemstoreTS; -1028 } -1029 } -1030 allStoresOpened = true; -1031 if(hasSloppyStores) { -1032 htableDescriptor = TableDescriptorBuilder.newBuilder(htableDescriptor) -1033 .setFlushPolicyClassName(FlushNonSloppyStoresFirstPolicy.class.getName()) -1034 .build(); -1035 LOG.info("Setting FlushNonSloppyStoresFirstPolicy for the region=" + this); -1036 } -1037 } catch (InterruptedException e) { -1038 throw (InterruptedIOException)new InterruptedIOException().initCause(e); -1039 } catch (ExecutionException e) { -1040 throw new IOException(e.getCause()); -1041 } finally { -1042 storeOpenerThreadPool.shutdownNow(); -1043 if (!allStoresOpened) { -1044 // something went wrong, close all opened stores -1045 LOG.error("Could not initialize all stores for the region=" + this); -1046 for (HStore store : this.stores.values()) { -1047 try { -1048 store.close(); -1049 } catch (IOException e) { -1050 LOG.warn("close store failed", e); -1051 } -1052 } -1053 } -1054 } -1055 } -1056 return Math.max(maxSeqId, maxMemstoreTS + 1); -1057 } -1058 -1059 private void initializeWarmup(final CancelableProgressable reporter) throws IOException { -1060 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this); -1061 // Initialize all the HStores -1062 status.setStatus("Warming up all the Stores"); -1063 try { -1064 initializeStores(reporter, status); -1065 } finally { -1066 status.markComplete("Done warming up."); -1067 } -1068 } -1069 -1070 /** -1071 * @return Map of StoreFiles by column family -1072 */ -1073 private NavigableMap<byte[], List<Path>> getStoreFiles() { -1074 NavigableMap<byte[], List<Path>> allStoreFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); -1075 for (HStore store : stores.values()) { -1076 Collection<HStoreFile> storeFiles = store.getStorefiles(); -1077 if (storeFiles == null) { -1078 continue; +1025 } +1026 allStoresOpened = true; +1027 if(hasSloppyStores) { +1028 htableDescriptor = TableDescriptorBuilder.newBuilder(htableDescriptor) +1029 .setFlushPolicyClassName(FlushNonSloppyStoresFirstPolicy.class.getName()) +1030 .build(); +1031 LOG.info("Setting FlushNonSloppyStoresFirstPolicy for the region=" + this); +1032 } +1033 } catch (InterruptedException e) { +1034 throw (InterruptedIOException)new InterruptedIOException().initCause(e); +1035 } catch (ExecutionException e) { +1036 throw new IOException(e.getCause()); +1037 } finally { +1038 storeOpenerThreadPool.shutdownNow(); +1039 if (!allStoresOpened) { +1040 // something went wrong, close all opened stores +1041 LOG.error("Could not initialize all stores for the region=" + this); +1042 for (HStore store : this.stores.values()) { +1043 try { +1044 store.close(); +1045 } catch (IOException e) { +1046 LOG.warn("close store failed", e); +1047 } +1048 } +1049 } +1050 } +1051 } +1052 return Math.max(maxSeqId, maxMemstoreTS + 1); +1053 } +1054 +1055 private void initializeWarmup(final CancelableProgressable reporter) throws IOException { +1056 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this); +1057 // Initialize all the HStores +1058 status.setStatus("Warming up all the Stores"); +1059 try { +1060 initializeStores(reporter, status); +1061 } finally { +1062 status.markComplete("Done warming up."); +1063 } +1064 } +1065 +1066 /** +1067 * @return Map of StoreFiles by column family +1068 */ +1069 private NavigableMap<byte[], List<Path>> getStoreFiles() { +1070 NavigableMap<byte[], List<Path>> allStoreFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); +1071 for (HStore store : stores.values()) { +1072 Collection<HStoreFile> storeFiles = store.getStorefiles(); +1073 if (storeFiles == null) { +1074 continue; +1075 } +1076 List<Path> storeFileNames = new ArrayList<>(); +1077 for (HStoreFile storeFile : storeFiles) { +1078 storeFileNames.add(storeFile.getPath()); 1079 } -1080 List<Path> storeFileNames = new ArrayList<>(); -1081 for (HStoreFile storeFile : storeFiles) { -1082 storeFileNames.add(storeFile.getPath()); -1083 } -1084 allStoreFiles.put(store.getColumnFamilyDescriptor().getName(), storeFileNames); -1085 } -1086 return allStoreFiles; -1087 } -1088 -1089 private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException { -1090 Map<byte[], List<Path>> storeFiles = getStoreFiles(); -1091 RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( -1092 RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId, -1093 getRegionServerServices().getServerName(), storeFiles); -1094 WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc, -1095 mvcc); -1096 } -1097 -1098 private void writeRegionCloseMarker(WAL wal) throws IOException { -1099 Map<byte[], List<Path>> storeFiles = getStoreFiles(); -1100 RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( -1101 RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(), -1102 getRegionServerServices().getServerName(), storeFiles); -1103 WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, -1104 mvcc); -1105 -1106 // Store SeqId in HDFS when a region closes -1107 // checking region folder exists is due to many tests which delete the table folder while a -1108 // table is still online -1109 if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { -1110 WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), -1111 mvcc.getReadPoint(), 0); -1112 } -1113 } -1114 -1115 /** -1116 * @return True if this region has references. -1117 */ -1118 public boolean hasReferences() { -1119 return stores.values().stream().anyMatch(HStore::hasReferences); +1080 allStoreFiles.put(store.getColumnFamilyDescriptor().getName(), storeFileNames); +1081 } +1082 return allStoreFiles; +1083 } +1084 +1085 private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException { +1086 Map<byte[], List<Path>> storeFiles = getStoreFiles(); +1087 RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( +1088 RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId, +1089 getRegionServerServices().getServerName(), storeFiles); +1090 WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc, +1091 mvcc); +1092 } +1093 +1094 private void writeRegionCloseMarker(WAL wal) throws IOException { +1095 Map<byte[], List<Path>> storeFiles = getStoreFiles(); +1096 RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( +1097 RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(), +1098 getRegionServerServices().getServerName(), storeFiles); +1099 WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, +1100 mvcc); +1101 +1102 // Store SeqId in HDFS when a region closes +1103 // checking region folder exists is due to many tests which delete the table folder while a +1104 // table is still online +1105 if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { +1106 WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), +1107 mvcc.getReadPoint(), 0); +1108 } +1109 } +1110 +1111 /** +1112 * @return True if this region has references. +1113 */ +1114 public boolean hasReferences() { +1115 return stores.values().stream().anyMatch(HStore::hasReferences); +1116 } +1117 +1118 public void blockUpdates() { +1119 this.updatesLock.writeLock().lock(); 1120 } 1121 -1122 public void blockUpdates() { -1123 this.updatesLock.writeLock().lock(); +1122 public void unblockUpdates() { +1123 this.updatesLock.writeLock().unlock(); 1124 } 1125 -1126 public void unblockUpdates() { -1127 this.updatesLock.writeLock().unlock(); -1128 } -1129 -1130 public HDFSBlocksDistribution getHDFSBlocksDistribution() { -1131 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); -1132 stores.values().stream().filter(s -> s.getStorefiles() != null) -1133 .flatMap(s -> s.getStorefiles().stream()).map(HStoreFile::getHDFSBlockDistribution) -1134 .forEachOrdered(hdfsBlocksDistribution::add); -1135 return hdfsBlocksDistribution; -1136 } -1137 -1138 /** -1139 * This is a helper function to compute HDFS block distribution on demand -1140 * @param conf configuration -1141 * @param tableDescriptor TableDescriptor of the table -1142 * @param regionInfo encoded name of the region -1143 * @return The HDFS blocks distribution for the given region. -1144 * @throws IOException -1145 */ -1146 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf, -1147 TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException { -1148 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName()); -1149 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath); -1150 } -1151 -1152 /** -1153 * This is a helper function to compute HDFS block distribution on demand -1154 * @param conf configuration -1155 * @param tableDescriptor TableDescriptor of the table -1156 * @param regionInfo encoded name of the region -1157 * @param tablePath the table directory -1158 * @return The HDFS blocks distribution for the given region. -1159 * @throws IOException -1160 */ -1161 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf, -1162 TableDescriptor tableDescriptor, RegionInfo regionInfo, Path tablePath) throws IOException { -1163 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); -1164 FileSystem fs = tablePath.getFileSystem(conf); -1165 -1166 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo); -1167 for (ColumnFamilyDescriptor family : tableDescriptor.getColumnFamilies()) { -1168 List<LocatedFileStatus> locatedFileStatusList = HRegionFileSystem -1169 .getStoreFilesLocatedStatus(regionFs, family.getNameAsString(), true); -1170 if (locatedFileStatusList == null) { -1171 continue; -1172 } -1173 -1174 for (LocatedFileStatus status : locatedFileStatusList) { -1175 Path p = status.getPath(); -1176 if (StoreFileInfo.isReference(p) || HFileLink.isHFileLink(p)) { -1177 // Only construct StoreFileInfo object if its not a hfile, save obj -1178 // creation -1179 StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, status); -1180 hdfsBlocksDistribution.add(storeFileInfo -1181 .computeHDFSBlocksDistribution(fs)); -1182 } else if (StoreFileInfo.isHFile(p)) { -1183 // If its a HFile, then lets just add to the block distribution -1184 // lets not create more objects here, not even another HDFSBlocksDistribution -1185 FSUtils.addToHDFSBlocksDistribution(hdfsBlocksDistribution, -1186 status.getBlockLocations()); -1187 } else { -1188 throw new IOException("path=" + p -1189 + " doesn't look like a valid StoreFile"); -1190 } -1191 } -1192 } -1193 return hdfsBlocksDistribution; -1194 } -1195 -1196 /** -1197 * Increase the size of mem store in this region and the size of global mem -1198 * store -1199 * @return the size of memstore in this region -1200 */ -1201 public long addAndGetMemStoreSize(MemStoreSize memstoreSize) { -1202 if (this.rsAccounting != null) { -1203 rsAccounting.incGlobalMemStoreSize(memstoreSize); -1204 } -1205 long size = this.memstoreDataSize.addAndGet(memstoreSize.getDataSize()); -1206 checkNegativeMemStoreDataSize(size, memstoreSize.getDataSize()); -1207 return size; -1208 } -1209 -1210 public void decrMemStoreSize(MemStoreSize memstoreSize) { -1211 if (this.rsAccounting != null) { -1212 rsAccounting.decGlobalMemStoreSize(memstoreSize); -1213 } -1214 long size = this.memstoreDataSize.addAndGet(-memstoreSize.getDataSize()); -1215 checkNegativeMemStoreDataSize(size, -memstoreSize.getDataSize()); -1216 } -1217 -1218 private void checkNegativeMemStoreDataSize(long memstoreDataSize, long delta) { -1219 // This is extremely bad if we make memstoreSize negative. Log as much info on the offending -1220 // caller as possible. (memStoreSize might be a negative value already -- freeing memory) -1221 if (memstoreDataSize < 0) { -1222 LOG.error("Asked to modify this region's (" + this.toString() -1223 + ") memstoreSize to a negative value which is incorrect. Current memstoreSize=" -1224 + (memstoreDataSize - delta) + ", delta=" + delta, new Exception()); -1225 } -1226 } -1227 -1228 @Override -1229 public RegionInfo getRegionInfo() { -1230 return this.fs.getRegionInfo(); -1231 } -1232 -1233 /** -1234 * @return Instance of {@link RegionServerServices} used by this HRegion. -1235 * Can be null. -1236 */ -1237 RegionServerServices getRegionServerServices() { -1238 return this.rsServices; -1239 } -1240 -1241 @Override -1242 public long getReadRequestsCount() { -1243 return readRequestsCount.sum(); -1244 } -1245 -1246 @Override -1247 public long getFilteredReadRequestsCount() { -1248 return filteredReadRequestsCount.sum(); -1249 } -1250 -1251 @Override -1252 public long getWriteRequestsCount() { -1253 return writeRequestsCount.sum(); -1254 } -1255 -1256 @Override -1257 public long getMemStoreSize() { -1258 return memstoreDataSize.get(); -1259 } -1260 -1261 /** @return store services for this region, to access services required by store level needs */ -1262 public RegionServicesForStores getRegionServicesForStores() { -1263 return regionServicesForStores; -1264 } -1265 -1266 @Override -1267 public long getNumMutationsWithoutWAL() { -1268 return numMutationsWithoutWAL.sum(); -1269 } -1270 -1271 @Override -1272 public long getDataInMemoryWithoutWAL() { -1273 return dataInMemoryWithoutWAL.sum(); -1274 } -1275 -1276 @Override -1277 public long getBlockedRequestsCount() { -1278 return blockedRequestsCount.sum(); -1279 } -1280 -1281 @Override -1282 public long getCheckAndMutateChecksPassed() { -1283 return checkAndMutateChecksPassed.sum(); -1284 } -1285 -1286 @Override -1287 public long getCheckAndMutateChecksFailed() { -1288 return checkAndMutateChecksFailed.sum(); -1289 } -1290 -1291 // TODO Needs to check whether we should expose our metrics system to CPs. If CPs themselves doing -1292 // the op and bypassing the core, this might be needed? Should be stop supporting the bypass -1293 // feature? -1294 public MetricsRegion getMetrics() { -1295 return metricsRegion; -1296 } -1297 -1298 @Override -1299 public boolean isClosed() { -1300 return this.closed.get(); -1301 } -1302 -1303 @Override -1304 public boolean isClosing() { -1305 return this.closing.get(); -1306 } -1307 -1308 @Override -1309 public boolean isReadOnly() { -1310 return this.writestate.isReadOnly(); -1311 } -1312 -1313 @Override -1314 public boolean isAvailable() { -1315 return !isClosed() && !isClosing(); -1316 } -1317 -1318 @Override -1319 public boolean isSplittable() { -1320 return isAvailable() && !hasReferences(); -1321 } -1322 -1323 @Override -1324 public boolean isMergeable() { -1325 if (!isAvailable()) { -1326 LOG.debug("Region " + this -1327 + " is not mergeable because it is closing or closed"); -1328 return false; -1329 } -1330 if (hasReferences()) { -1331 LOG.debug("Region " + this -1332 + " is not mergeable because it has references"); -1333 return false; -1334 } -1335 -1336 return true; -1337 } -1338 -1339 public boolean areWritesEnabled() { -1340 synchronized(this.writestate) { -1341 return this.writestate.writesEnabled; -1342 } -1343 } -1344 -1345 @VisibleForTesting -1346 public MultiVersionConcurrencyControl getMVCC() { -1347 return mvcc; -1348 } -1349 -1350 @Override -1351 public long getMaxFlushedSeqId() { -1352 return maxFlushedSeqId; -1353 } -1354 -1355 /** -1356 * @return readpoint considering given IsolationLevel. Pass {@code null} for default -1357 */ -1358 public long getReadPoint(IsolationLevel isolationLevel) { -1359 if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) { -1360 // This scan can read even uncommitted transactions -1361 return Long.MAX_VALUE; -1362 } -1363 return mvcc.getReadPoint(); +1126 public HDFSBlocksDistribution getHDFSBlocksDistribution() { +1127 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); +1128 stores.values().stream().filter(s -> s.getStorefiles() != null) +1129 .flatMap(s -> s.getStorefiles().stream()).map(HStoreFile::getHDFSBlockDistribution) +1130 .forEachOrdered(hdfsBlocksDistribution::add); +1131 return hdfsBlocksDistribution; +1132 } +1133 +1134 /** +1135 * This is a helper function to compute HDFS block distribution on demand +1136 * @param conf configuration +1137 * @param tableDescriptor TableDescriptor of the table +1138 * @param regionInfo encoded name of the region +1139 * @return The HDFS blocks distribution for the given region. +1140 * @throws IOException +1141 */ +1142 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf, +1143 TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException { +1144 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName()); +1145 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath); +1146 } +1147 +1148 /** +1149 * This is a helper function to compute HDFS block distribution on demand +1150 * @param conf configuration +1151 * @param tableDescriptor TableDescriptor of the table +1152 * @param regionInfo encoded name of the region +1153 * @param tablePath the table directory +1154 * @return The HDFS blocks distribution for the given region. +1155 * @throws IOException +1156 */ +1157 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf, +1158 TableDescriptor tableDescriptor, RegionInfo regionInfo, Path tablePath) throws IOException { +1159 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); +1160 FileSystem fs = tablePath.getFileSystem(conf); +1161 +1162 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo); +1163 for (ColumnFamilyDescriptor family : tableDescriptor.getColumnFamilies()) { +1164 List<LocatedFileStatus> locatedFileStatusList = HRegionFileSystem +1165 .getStoreFilesLocatedStatus(regionFs, family.getNameAsString(), true); +1166 if (locatedFileStatusList == null) { +1167 continue; +1168 } +1169 +1170 for (LocatedFileStatus status : locatedFileStatusList) { +1171 Path p = status.getPath(); +1172 if (StoreFileInfo.isReference(p) || HFileLink.isHFileLink(p)) { +1173 // Only construct StoreFileInfo object if its not a hfile, save obj +1174 // creation +1175 StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, status); +1176 hdfsBlocksDistribution.add(storeFileInfo +1177 .computeHDFSBlocksDistribution(fs)); +1178 } else if (StoreFileInfo.isHFile(p)) { +1179 // If its a HFile, then lets just add to the block distribution +1180 // lets not create more objects here, not even another HDFSBlocksDistribution +1181 FSUtils.addToHDFSBlocksDistribution(hdfsBlocksDistribution, +1182 status.getBlockLocations()); +1183 } else { +1184 throw new IOException("path=" + p +1185 + " doesn't look like a valid StoreFile"); +1186 } +1187 } +1188 } +1189 return hdfsBlocksDistribution; +1190 } +1191 +1192 /** +1193 * Increase the size of mem store in this region and the size of global mem +1194 * store +1195 * @return the size of memstore in this region +1196 */ +1197 public long addAndGetMemStoreSize(MemStoreSize memstoreSize) { +1198 if (this.rsAccounting != null) { +1199 rsAccounting.incGlobalMemStoreSize(memstoreSize); +1200 } +1201 long size = this.memstoreDataSize.addAndGet(memstoreSize.getDataSize()); +1202 checkNegativeMemStoreDataSize(size, memstoreSize.getDataSize()); +1203 return size; +1204 } +1205 +1206 public void decrMemStoreSize(MemStoreSize memstoreSize) { +1207 if (this.rsAccounting != null) { +1208 rsAccounting.decGlobalMemStoreSize(memstoreSize); +1209 } +1210 long size = this.memstoreDataSize.addAndGet(-memstoreSize.getDataSize()); +1211 checkNegativeMemStoreDataSize(size, -memstoreSize.getDataSize()); +1212 } +1213 +1214 private void checkNegativeMemStoreDataSize(long memstoreDataSize, long delta) { +1215 // This is extremely bad if we make memstoreSize negative. Log as much info on the offending +1216 // caller as possible. (memStoreSize might be a negative value already -- freeing memory) +1217 if (memstoreDataSize < 0) { +1218 LOG.error("Asked to modify this region's (" + this.toString() +1219 + ") memstoreSize to a negative value which is incorrect. Current memstoreSize=" +1220 + (memstoreDataSize - delta) + ", delta=" + delta, new Exception()); +1221 } +1222 } +1223 +1224 @Override +1225 public RegionInfo getRegionInfo() { +1226 return this.fs.getRegionInfo(); +1227 } +1228 +1229 /** +1230 * @return Instance of {@link RegionServerServices} used by this HRegion. +1231 * Can be null. +1232 */ +1233 RegionServerServices getRegionServerServices() { +1234 return this.rsServices; +1235 } +1236 +1237 @Override +1238 public long getReadRequestsCount() { +1239 return readRequestsCount.sum(); +1240 } +1241 +1242 @Override +1243 public long getFilteredReadRequestsCount() { +1244 return filteredReadRequestsCount.sum(); +1245 } +1246 +1247 @Override +1248 public long getWriteRequestsCount() { +1249 return writeRequestsCount.sum(); +1250 } +1251 +1252 @Override +1253 public long getMemStoreSize() { +1254 return memstoreDataSize.get(); +1255 } +1256 +1257 /** @return store services for this region, to access services required by store level needs */ +1258 public RegionServicesForStores getRegionServicesForStores() { +1259 return regionServicesForStores; +1260 } +1261 +1262 @Override +1263 public long getNumMutationsWithoutWAL() { +1264 return numMutationsWithoutWAL.sum(); +1265 } +1266 +1267 @Override +1268 public long getDataInMemoryWithoutWAL() { +1269 return dataInMemoryWithoutWAL.sum(); +1270 } +1271 +1272 @Override +1273 public long getBlockedRequestsCount() { +1274 return blockedRequestsCount.sum(); +1275 } +1276 +1277 @Override +1278 public long getCheckAndMutateChecksPassed() { +1279 return checkAndMutateChecksPassed.sum(); +1280 } +1281 +1282 @Override +1283 public long getCheckAndMutateChecksFailed() { +1284 return checkAndMutateChecksFailed.sum(); +1285 } +1286 +1287 // TODO Needs to check whether we should expose our metrics system to CPs. If CPs themselves doing +1288 // the op and bypassing the core, this might be needed? Should be stop supporting the bypass +1289 // feature? +1290 public MetricsRegion getMetrics() { +1291 return metricsRegion; +1292 } +1293 +1294 @Override +1295 public boolean isClosed() { +1296 return this.closed.get(); +1297 } +1298 +1299 @Override +1300 public boolean isClosing() { +1301 return this.closing.get(); +1302 } +1303 +1304 @Override +1305 public boolean isReadOnly() { +1306 return this.writestate.isReadOnly(); +1307 } +1308 +1309 @Override +1310 public boolean isAvailable() { +1311 return !isClosed() && !isClosing(); +1312 } +1313 +1314 @Override +1315 public boolean isSplittable() { +1316 return isAvailable() && !hasReferences(); +1317 } +1318 +1319 @Override +1320 public boolean isMergeable() { +1321 if (!isAvailable()) { +1322 LOG.debug("Region " + this +1323 + " is not mergeable because it is closing or closed"); +1324 return false; +1325 } +1326 if (hasReferences()) { +1327 LOG.debug("Region " + this +1328 + " is not mergeable because it has references"); +1329 return false; +