Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 996A5200CF8 for ; Thu, 20 Jul 2017 00:07:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 96A6B16A0C1; Wed, 19 Jul 2017 22:07:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5ADFF16A0BC for ; Thu, 20 Jul 2017 00:07:26 +0200 (CEST) Received: (qmail 48288 invoked by uid 500); 19 Jul 2017 22:07:22 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 45955 invoked by uid 99); 19 Jul 2017 22:07:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2017 22:07:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C001E9671; Wed, 19 Jul 2017 22:07:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 19 Jul 2017 22:07:47 -0000 Message-Id: <59e39f1607bc42d199cd998f67d5cf21@git.apache.org> In-Reply-To: <06eabf77f2c545f393210f621f2f13a5@git.apache.org> References: <06eabf77f2c545f393210f621f2f13a5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Wed, 19 Jul 2017 22:07:28 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/9eba7fcf/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html index 53cae9a..64d0880 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html @@ -1034,289 +1034,283 @@ 1026 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, 1027 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) 1028 throws IOException { -1029 final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size()); -1030 for (LoadQueueItem lqi : lqis) { -1031 if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) { -1032 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); -1033 } -1034 } -1035 try { -1036 List<LoadQueueItem> toRetry = new ArrayList<>(); -1037 Configuration conf = getConf(); -1038 byte[] region = RpcRetryingCallerFactory.instantiate(conf, -1039 null).<byte[]> newCaller() -1040 .callWithRetries(serviceCallable, Integer.MAX_VALUE); -1041 if (region == null) { -1042 LOG.warn("Attempt to bulk load region containing " -1043 + Bytes.toStringBinary(first) + " into table " -1044 + tableName + " with files " + lqis -1045 + " failed. This is recoverable and they will be retried."); -1046 toRetry.addAll(lqis); // return lqi's to retry -1047 } -1048 // success -1049 return toRetry; -1050 } catch (IOException e) { -1051 LOG.error("Encountered unrecoverable error from region server, additional details: " -1052 + serviceCallable.getExceptionMessageAdditionalDetail(), e); -1053 throw e; -1054 } -1055 } -1056 -1057 private final String toString(List<Pair<byte[], String>> list) { -1058 StringBuffer sb = new StringBuffer(); -1059 sb.append("["); -1060 if(list != null){ -1061 for(Pair<byte[], String> pair: list) { -1062 sb.append("{"); -1063 sb.append(Bytes.toStringBinary(pair.getFirst())); -1064 sb.append(","); -1065 sb.append(pair.getSecond()); -1066 sb.append("}"); -1067 } -1068 } -1069 sb.append("]"); -1070 return sb.toString(); -1071 } -1072 private boolean isSecureBulkLoadEndpointAvailable() { -1073 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); -1074 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); -1075 } -1076 -1077 /** -1078 * Split a storefile into a top and bottom half, maintaining -1079 * the metadata, recreating bloom filters, etc. -1080 */ -1081 static void splitStoreFile( -1082 Configuration conf, Path inFile, -1083 HColumnDescriptor familyDesc, byte[] splitKey, -1084 Path bottomOut, Path topOut) throws IOException { -1085 // Open reader with no block cache, and not in-memory -1086 Reference topReference = Reference.createTopReference(splitKey); -1087 Reference bottomReference = Reference.createBottomReference(splitKey); -1088 -1089 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); -1090 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); -1091 } -1092 -1093 /** -1094 * Copy half of an HFile into a new HFile. -1095 */ -1096 private static void copyHFileHalf( -1097 Configuration conf, Path inFile, Path outFile, Reference reference, -1098 HColumnDescriptor familyDescriptor) -1099 throws IOException { -1100 FileSystem fs = inFile.getFileSystem(conf); -1101 CacheConfig cacheConf = new CacheConfig(conf); -1102 HalfStoreFileReader halfReader = null; -1103 StoreFileWriter halfWriter = null; -1104 try { -1105 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, -1106 new AtomicInteger(0), true, conf); -1107 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); -1108 -1109 int blocksize = familyDescriptor.getBlocksize(); -1110 Algorithm compression = familyDescriptor.getCompressionType(); -1111 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); -1112 HFileContext hFileContext = new HFileContextBuilder() -1113 .withCompression(compression) -1114 .withChecksumType(HStore.getChecksumType(conf)) -1115 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) -1116 .withBlockSize(blocksize) -1117 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()) -1118 .withIncludesTags(true) -1119 .build(); -1120 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, -1121 fs) -1122 .withFilePath(outFile) -1123 .withBloomType(bloomFilterType) -1124 .withFileContext(hFileContext) -1125 .build(); -1126 HFileScanner scanner = halfReader.getScanner(false, false, false); -1127 scanner.seekTo(); -1128 do { -1129 halfWriter.append(scanner.getCell()); -1130 } while (scanner.next()); -1131 -1132 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) { -1133 if (shouldCopyHFileMetaKey(entry.getKey())) { -1134 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); -1135 } -1136 } -1137 } finally { -1138 if (halfWriter != null) { -1139 halfWriter.close(); -1140 } -1141 if (halfReader != null) { -1142 halfReader.close(cacheConf.shouldEvictOnClose()); -1143 } -1144 } -1145 } +1029 try { +1030 List<LoadQueueItem> toRetry = new ArrayList<>(); +1031 Configuration conf = getConf(); +1032 byte[] region = RpcRetryingCallerFactory.instantiate(conf, +1033 null).<byte[]> newCaller() +1034 .callWithRetries(serviceCallable, Integer.MAX_VALUE); +1035 if (region == null) { +1036 LOG.warn("Attempt to bulk load region containing " +1037 + Bytes.toStringBinary(first) + " into table " +1038 + tableName + " with files " + lqis +1039 + " failed. This is recoverable and they will be retried."); +1040 toRetry.addAll(lqis); // return lqi's to retry +1041 } +1042 // success +1043 return toRetry; +1044 } catch (IOException e) { +1045 LOG.error("Encountered unrecoverable error from region server, additional details: " +1046 + serviceCallable.getExceptionMessageAdditionalDetail(), e); +1047 throw e; +1048 } +1049 } +1050 +1051 private final String toString(List<Pair<byte[], String>> list) { +1052 StringBuffer sb = new StringBuffer(); +1053 sb.append("["); +1054 if(list != null){ +1055 for(Pair<byte[], String> pair: list) { +1056 sb.append("{"); +1057 sb.append(Bytes.toStringBinary(pair.getFirst())); +1058 sb.append(","); +1059 sb.append(pair.getSecond()); +1060 sb.append("}"); +1061 } +1062 } +1063 sb.append("]"); +1064 return sb.toString(); +1065 } +1066 private boolean isSecureBulkLoadEndpointAvailable() { +1067 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); +1068 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); +1069 } +1070 +1071 /** +1072 * Split a storefile into a top and bottom half, maintaining +1073 * the metadata, recreating bloom filters, etc. +1074 */ +1075 static void splitStoreFile( +1076 Configuration conf, Path inFile, +1077 HColumnDescriptor familyDesc, byte[] splitKey, +1078 Path bottomOut, Path topOut) throws IOException { +1079 // Open reader with no block cache, and not in-memory +1080 Reference topReference = Reference.createTopReference(splitKey); +1081 Reference bottomReference = Reference.createBottomReference(splitKey); +1082 +1083 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); +1084 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); +1085 } +1086 +1087 /** +1088 * Copy half of an HFile into a new HFile. +1089 */ +1090 private static void copyHFileHalf( +1091 Configuration conf, Path inFile, Path outFile, Reference reference, +1092 HColumnDescriptor familyDescriptor) +1093 throws IOException { +1094 FileSystem fs = inFile.getFileSystem(conf); +1095 CacheConfig cacheConf = new CacheConfig(conf); +1096 HalfStoreFileReader halfReader = null; +1097 StoreFileWriter halfWriter = null; +1098 try { +1099 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, +1100 new AtomicInteger(0), true, conf); +1101 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); +1102 +1103 int blocksize = familyDescriptor.getBlocksize(); +1104 Algorithm compression = familyDescriptor.getCompressionType(); +1105 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); +1106 HFileContext hFileContext = new HFileContextBuilder() +1107 .withCompression(compression) +1108 .withChecksumType(HStore.getChecksumType(conf)) +1109 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) +1110 .withBlockSize(blocksize) +1111 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()) +1112 .withIncludesTags(true) +1113 .build(); +1114 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, +1115 fs) +1116 .withFilePath(outFile) +1117 .withBloomType(bloomFilterType) +1118 .withFileContext(hFileContext) +1119 .build(); +1120 HFileScanner scanner = halfReader.getScanner(false, false, false); +1121 scanner.seekTo(); +1122 do { +1123 halfWriter.append(scanner.getCell()); +1124 } while (scanner.next()); +1125 +1126 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) { +1127 if (shouldCopyHFileMetaKey(entry.getKey())) { +1128 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); +1129 } +1130 } +1131 } finally { +1132 if (halfWriter != null) { +1133 halfWriter.close(); +1134 } +1135 if (halfReader != null) { +1136 halfReader.close(cacheConf.shouldEvictOnClose()); +1137 } +1138 } +1139 } +1140 +1141 private static boolean shouldCopyHFileMetaKey(byte[] key) { +1142 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 +1143 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { +1144 return false; +1145 } 1146 -1147 private static boolean shouldCopyHFileMetaKey(byte[] key) { -1148 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 -1149 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { -1150 return false; -1151 } -1152 -1153 return !HFile.isReservedFileInfoKey(key); -1154 } -1155 -1156 /* -1157 * Infers region boundaries for a new table. -1158 * Parameter: -1159 * bdryMap is a map between keys to an integer belonging to {+1, -1} -1160 * If a key is a start key of a file, then it maps to +1 -1161 * If a key is an end key of a file, then it maps to -1 -1162 * Algo: -1163 * 1) Poll on the keys in order: -1164 * a) Keep adding the mapped values to these keys (runningSum) -1165 * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to -1166 * a boundary list. -1167 * 2) Return the boundary list. -1168 */ -1169 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) { -1170 ArrayList<byte[]> keysArray = new ArrayList<>(); -1171 int runningValue = 0; -1172 byte[] currStartKey = null; -1173 boolean firstBoundary = true; -1174 -1175 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) { -1176 if (runningValue == 0) { -1177 currStartKey = item.getKey(); -1178 } -1179 runningValue += item.getValue(); -1180 if (runningValue == 0) { -1181 if (!firstBoundary) { -1182 keysArray.add(currStartKey); -1183 } -1184 firstBoundary = false; -1185 } -1186 } -1187 -1188 return keysArray.toArray(new byte[0][0]); -1189 } -1190 -1191 /* -1192 * If the table is created for the first time, then "completebulkload" reads the files twice. -1193 * More modifications necessary if we want to avoid doing it. -1194 */ -1195 private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { -1196 final Path hfofDir = new Path(dirPath); -1197 final FileSystem fs = hfofDir.getFileSystem(getConf()); -1198 -1199 // Add column families -1200 // Build a set of keys -1201 final HTableDescriptor htd = new HTableDescriptor(tableName); -1202 final TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); -1203 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() { +1147 return !HFile.isReservedFileInfoKey(key); +1148 } +1149 +1150 /* +1151 * Infers region boundaries for a new table. +1152 * Parameter: +1153 * bdryMap is a map between keys to an integer belonging to {+1, -1} +1154 * If a key is a start key of a file, then it maps to +1 +1155 * If a key is an end key of a file, then it maps to -1 +1156 * Algo: +1157 * 1) Poll on the keys in order: +1158 * a) Keep adding the mapped values to these keys (runningSum) +1159 * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to +1160 * a boundary list. +1161 * 2) Return the boundary list. +1162 */ +1163 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) { +1164 ArrayList<byte[]> keysArray = new ArrayList<>(); +1165 int runningValue = 0; +1166 byte[] currStartKey = null; +1167 boolean firstBoundary = true; +1168 +1169 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) { +1170 if (runningValue == 0) { +1171 currStartKey = item.getKey(); +1172 } +1173 runningValue += item.getValue(); +1174 if (runningValue == 0) { +1175 if (!firstBoundary) { +1176 keysArray.add(currStartKey); +1177 } +1178 firstBoundary = false; +1179 } +1180 } +1181 +1182 return keysArray.toArray(new byte[0][0]); +1183 } +1184 +1185 /* +1186 * If the table is created for the first time, then "completebulkload" reads the files twice. +1187 * More modifications necessary if we want to avoid doing it. +1188 */ +1189 private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { +1190 final Path hfofDir = new Path(dirPath); +1191 final FileSystem fs = hfofDir.getFileSystem(getConf()); +1192 +1193 // Add column families +1194 // Build a set of keys +1195 final HTableDescriptor htd = new HTableDescriptor(tableName); +1196 final TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); +1197 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() { +1198 @Override +1199 public HColumnDescriptor bulkFamily(final byte[] familyName) { +1200 HColumnDescriptor hcd = new HColumnDescriptor(familyName); +1201 htd.addFamily(hcd); +1202 return hcd; +1203 } 1204 @Override -1205 public HColumnDescriptor bulkFamily(final byte[] familyName) { -1206 HColumnDescriptor hcd = new HColumnDescriptor(familyName); -1207 htd.addFamily(hcd); -1208 return hcd; -1209 } -1210 @Override -1211 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) -1212 throws IOException { -1213 Path hfile = hfileStatus.getPath(); -1214 try (HFile.Reader reader = -1215 HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) { -1216 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { -1217 hcd.setCompressionType(reader.getFileContext().getCompression()); -1218 LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " + -1219 hcd.toString()); -1220 } -1221 reader.loadFileInfo(); -1222 byte[] first = reader.getFirstRowKey(); -1223 byte[] last = reader.getLastRowKey(); -1224 -1225 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + -1226 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); -1227 -1228 // To eventually infer start key-end key boundaries -1229 Integer value = map.containsKey(first) ? map.get(first) : 0; -1230 map.put(first, value + 1); +1205 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) +1206 throws IOException { +1207 Path hfile = hfileStatus.getPath(); +1208 try (HFile.Reader reader = +1209 HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) { +1210 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { +1211 hcd.setCompressionType(reader.getFileContext().getCompression()); +1212 LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " + +1213 hcd.toString()); +1214 } +1215 reader.loadFileInfo(); +1216 byte[] first = reader.getFirstRowKey(); +1217 byte[] last = reader.getLastRowKey(); +1218 +1219 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + +1220 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); +1221 +1222 // To eventually infer start key-end key boundaries +1223 Integer value = map.containsKey(first) ? map.get(first) : 0; +1224 map.put(first, value + 1); +1225 +1226 value = map.containsKey(last) ? map.get(last) : 0; +1227 map.put(last, value - 1); +1228 } +1229 } +1230 }); 1231 -1232 value = map.containsKey(last) ? map.get(last) : 0; -1233 map.put(last, value - 1); -1234 } -1235 } -1236 }); +1232 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map); +1233 admin.createTable(htd, keys); +1234 +1235 LOG.info("Table "+ tableName +" is available!!"); +1236 } 1237 -1238 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map); -1239 admin.createTable(htd, keys); -1240 -1241 LOG.info("Table "+ tableName +" is available!!"); -1242 } +1238 public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map, +1239 TableName tableName) throws IOException { +1240 initialize(); +1241 try (Connection connection = ConnectionFactory.createConnection(getConf()); +1242 Admin admin = connection.getAdmin()) { 1243 -1244 public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map, -1245 TableName tableName) throws IOException { -1246 initialize(); -1247 try (Connection connection = ConnectionFactory.createConnection(getConf()); -1248 Admin admin = connection.getAdmin()) { -1249 -1250 boolean tableExists = admin.tableExists(tableName); -1251 if (!tableExists) { -1252 if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { -1253 this.createTable(tableName, dirPath, admin); -1254 } else { -1255 String errorMsg = format("Table '%s' does not exist.", tableName); -1256 LOG.error(errorMsg); -1257 throw new TableNotFoundException(errorMsg); -1258 } -1259 } -1260 Path hfofDir = null; -1261 if (dirPath != null) { -1262 hfofDir = new Path(dirPath); -1263 } -1264 -1265 try (Table table = connection.getTable(tableName); -1266 RegionLocator locator = connection.getRegionLocator(tableName)) { -1267 boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); -1268 boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false); -1269 if (dirPath != null) { -1270 doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); -1271 } else { -1272 doBulkLoad(map, admin, table, locator, silence, copyFiles); -1273 } -1274 return retValue; -1275 } -1276 } -1277 } -1278 -1279 @Override -1280 public int run(String[] args) throws Exception { -1281 if (args.length < 2) { -1282 usage(); -1283 return -1; -1284 } -1285 -1286 String dirPath = args[0]; -1287 TableName tableName = TableName.valueOf(args[1]); -1288 Map<LoadQueueItem, ByteBuffer> loaded = run(dirPath, null, tableName); -1289 if (loaded == null || !loaded.isEmpty()) return 0; -1290 return -1; +1244 boolean tableExists = admin.tableExists(tableName); +1245 if (!tableExists) { +1246 if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { +1247 this.createTable(tableName, dirPath, admin); +1248 } else { +1249 String errorMsg = format("Table '%s' does not exist.", tableName); +1250 LOG.error(errorMsg); +1251 throw new TableNotFoundException(errorMsg); +1252 } +1253 } +1254 Path hfofDir = null; +1255 if (dirPath != null) { +1256 hfofDir = new Path(dirPath); +1257 } +1258 +1259 try (Table table = connection.getTable(tableName); +1260 RegionLocator locator = connection.getRegionLocator(tableName)) { +1261 boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); +1262 boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false); +1263 if (dirPath != null) { +1264 doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); +1265 } else { +1266 doBulkLoad(map, admin, table, locator, silence, copyFiles); +1267 } +1268 return retValue; +1269 } +1270 } +1271 } +1272 +1273 @Override +1274 public int run(String[] args) throws Exception { +1275 if (args.length < 2) { +1276 usage(); +1277 return -1; +1278 } +1279 +1280 String dirPath = args[0]; +1281 TableName tableName = TableName.valueOf(args[1]); +1282 Map<LoadQueueItem, ByteBuffer> loaded = run(dirPath, null, tableName); +1283 if (loaded == null || !loaded.isEmpty()) return 0; +1284 return -1; +1285 } +1286 +1287 public static void main(String[] args) throws Exception { +1288 Configuration conf = HBaseConfiguration.create(); +1289 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); +1290 System.exit(ret); 1291 } 1292 -1293 public static void main(String[] args) throws Exception { -1294 Configuration conf = HBaseConfiguration.create(); -1295 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); -1296 System.exit(ret); -1297 } -1298 -1299 /** -1300 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is -1301 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes -1302 * property. This directory is used as a temporary directory where all files are initially -1303 * copied/moved from user given directory, set all the required file permissions and then from -1304 * their it is finally loaded into a table. This should be set only when, one would like to manage -1305 * the staging directory by itself. Otherwise this tool will handle this by itself. -1306 * @param stagingDir staging directory path -1307 */ -1308 public void setBulkToken(String stagingDir) { -1309 this.bulkToken = stagingDir; -1310 } -1311} +1293 /** +1294 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is +1295 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes +1296 * property. This directory is used as a temporary directory where all files are initially +1297 * copied/moved from user given directory, set all the required file permissions and then from +1298 * their it is finally loaded into a table. This should be set only when, one would like to manage +1299 * the staging directory by itself. Otherwise this tool will handle this by itself. +1300 * @param stagingDir staging directory path +1301 */ +1302 public void setBulkToken(String stagingDir) { +1303 this.bulkToken = stagingDir; +1304 } +1305} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/9eba7fcf/devapidocs/src-html/org/apache/hadoop/hbase/master/DeadServer.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/DeadServer.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/DeadServer.html index 0f70d04..3603490 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/DeadServer.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/DeadServer.html @@ -142,80 +142,75 @@ 134 135 assert numProcessing >= 0: "Number of dead servers in processing should always be non-negative"; 136 -137 if (numProcessing < 0) { -138 LOG.error("Number of dead servers in processing = " + numProcessing -139 + ". Something went wrong, this should always be non-negative."); -140 numProcessing = 0; -141 } -142 if (numProcessing == 0) { processing = false; } -143 } -144 -145 public synchronized int size() { -146 return deadServers.size(); -147 } -148 -149 public synchronized boolean isEmpty() { -150 return deadServers.isEmpty(); -151 } -152 -153 public synchronized void cleanAllPreviousInstances(final ServerName newServerName) { -154 Iterator<ServerName> it = deadServers.keySet().iterator(); -155 while (it.hasNext()) { -156 ServerName sn = it.next(); -157 if (ServerName.isSameHostnameAndPort(sn, newServerName)) { -158 it.remove(); -159 } -160 } -161 } -162 -163 public synchronized String toString() { -164 StringBuilder sb = new StringBuilder(); -165 for (ServerName sn : deadServers.keySet()) { -166 if (sb.length() > 0) { -167 sb.append(", "); -168 } -169 sb.append(sn.toString()); -170 } -171 return sb.toString(); -172 } -173 -174 /** -175 * Extract all the servers dead since a given time, and sort them. -176 * @param ts the time, 0 for all -177 * @return a sorted array list, by death time, lowest values first. -178 */ -179 public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){ -180 List<Pair<ServerName, Long>> res = new ArrayList<>(size()); -181 -182 for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){ -183 if (entry.getValue() >= ts){ -184 res.add(new Pair<>(entry.getKey(), entry.getValue())); -185 } -186 } -187 -188 Collections.sort(res, ServerNameDeathDateComparator); -189 return res; -190 } -191 -192 /** -193 * Get the time when a server died -194 * @param deadServerName the dead server name -195 * @return the date when the server died -196 */ -197 public synchronized Date getTimeOfDeath(final ServerName deadServerName){ -198 Long time = deadServers.get(deadServerName); -199 return time == null ? null : new Date(time); -200 } -201 -202 private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator = -203 new Comparator<Pair<ServerName, Long>>(){ -204 -205 @Override -206 public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) { -207 return o1.getSecond().compareTo(o2.getSecond()); -208 } -209 }; -210} +137 if (numProcessing == 0) { processing = false; } +138 } +139 +140 public synchronized int size() { +141 return deadServers.size(); +142 } +143 +144 public synchronized boolean isEmpty() { +145 return deadServers.isEmpty(); +146 } +147 +148 public synchronized void cleanAllPreviousInstances(final ServerName newServerName) { +149 Iterator<ServerName> it = deadServers.keySet().iterator(); +150 while (it.hasNext()) { +151 ServerName sn = it.next(); +152 if (ServerName.isSameHostnameAndPort(sn, newServerName)) { +153 it.remove(); +154 } +155 } +156 } +157 +158 public synchronized String toString() { +159 StringBuilder sb = new StringBuilder(); +160 for (ServerName sn : deadServers.keySet()) { +161 if (sb.length() > 0) { +162 sb.append(", "); +163 } +164 sb.append(sn.toString()); +165 } +166 return sb.toString(); +167 } +168 +169 /** +170 * Extract all the servers dead since a given time, and sort them. +171 * @param ts the time, 0 for all +172 * @return a sorted array list, by death time, lowest values first. +173 */ +174 public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){ +175 List<Pair<ServerName, Long>> res = new ArrayList<>(size()); +176 +177 for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){ +178 if (entry.getValue() >= ts){ +179 res.add(new Pair<>(entry.getKey(), entry.getValue())); +180 } +181 } +182 +183 Collections.sort(res, ServerNameDeathDateComparator); +184 return res; +185 } +186 +187 /** +188 * Get the time when a server died +189 * @param deadServerName the dead server name +190 * @return the date when the server died +191 */ +192 public synchronized Date getTimeOfDeath(final ServerName deadServerName){ +193 Long time = deadServers.get(deadServerName); +194 return time == null ? null : new Date(time); +195 } +196 +197 private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator = +198 new Comparator<Pair<ServerName, Long>>(){ +199 +200 @Override +201 public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) { +202 return o1.getSecond().compareTo(o2.getSecond()); +203 } +204 }; +205}