Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 1D19C18077F for ; Tue, 2 Jan 2018 16:18:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0D547160C38; Tue, 2 Jan 2018 15:18:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A4F15160C1B for ; Tue, 2 Jan 2018 16:18:19 +0100 (CET) Received: (qmail 24130 invoked by uid 500); 2 Jan 2018 15:18:17 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23927 invoked by uid 99); 2 Jan 2018 15:18:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jan 2018 15:18:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5F790DFC00; Tue, 2 Jan 2018 15:18:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 02 Jan 2018 15:18:27 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/17] hbase-site git commit: Published site at . archived-at: Tue, 02 Jan 2018 15:18:22 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/25f2bbc0/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html b/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html index 0f49224..514ad87 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html @@ -45,1938 +45,1980 @@ 037import org.apache.commons.lang3.ArrayUtils; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.hadoop.conf.Configuration; -040import org.apache.hadoop.fs.Path; -041import org.apache.hadoop.hbase.Cell; -042import org.apache.hadoop.hbase.CellUtil; -043import org.apache.hadoop.hbase.HBaseConfiguration; -044import org.apache.hadoop.hbase.HColumnDescriptor; -045import org.apache.hadoop.hbase.HTableDescriptor; -046import org.apache.hadoop.hbase.NamespaceDescriptor; -047import org.apache.hadoop.hbase.ServerName; -048import org.apache.hadoop.hbase.TableName; -049import org.apache.hadoop.hbase.backup.BackupInfo; -050import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; -051import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -052import org.apache.hadoop.hbase.backup.BackupType; -053import org.apache.hadoop.hbase.backup.util.BackupUtils; -054import org.apache.hadoop.hbase.client.Admin; -055import org.apache.hadoop.hbase.client.Connection; -056import org.apache.hadoop.hbase.client.Delete; -057import org.apache.hadoop.hbase.client.Get; -058import org.apache.hadoop.hbase.client.Put; -059import org.apache.hadoop.hbase.client.Result; -060import org.apache.hadoop.hbase.client.ResultScanner; -061import org.apache.hadoop.hbase.client.Scan; -062import org.apache.hadoop.hbase.client.SnapshotDescription; -063import org.apache.hadoop.hbase.client.Table; -064import org.apache.hadoop.hbase.util.Bytes; -065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -066import org.apache.hadoop.hbase.util.Pair; -067import org.apache.yetus.audience.InterfaceAudience; -068import org.slf4j.Logger; -069import org.slf4j.LoggerFactory; -070 -071import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; -072import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -073 -074/** -075 * This class provides API to access backup system table<br> -076 * -077 * Backup system table schema:<br> -078 * <p><ul> -079 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> -080 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> -081 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> -082 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; -083 * value = map[RS-> last WAL timestamp]</li> -084 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> -085 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; -086 * value = backupId and full WAL file name</li> -087 * </ul></p> -088 */ -089 -090@InterfaceAudience.Private -091public final class BackupSystemTable implements Closeable { -092 private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); -093 -094 static class WALItem { -095 String backupId; -096 String walFile; -097 String backupRoot; -098 -099 WALItem(String backupId, String walFile, String backupRoot) { -100 this.backupId = backupId; -101 this.walFile = walFile; -102 this.backupRoot = backupRoot; -103 } -104 -105 public String getBackupId() { -106 return backupId; -107 } -108 -109 public String getWalFile() { -110 return walFile; -111 } -112 -113 public String getBackupRoot() { -114 return backupRoot; -115 } -116 -117 @Override -118 public String toString() { -119 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; -120 } -121 -122 } -123 -124 private TableName tableName; -125 /** -126 * Stores backup sessions (contexts) -127 */ -128 final static byte[] SESSIONS_FAMILY = "session".getBytes(); -129 /** -130 * Stores other meta -131 */ -132 final static byte[] META_FAMILY = "meta".getBytes(); -133 final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); -134 /** -135 * Connection to HBase cluster, shared among all instances -136 */ -137 private final Connection connection; -138 -139 private final static String BACKUP_INFO_PREFIX = "session:"; -140 private final static String START_CODE_ROW = "startcode:"; -141 private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes(); -142 private final static byte[] ACTIVE_SESSION_COL = "c".getBytes(); -143 -144 private final static byte[] ACTIVE_SESSION_YES = "yes".getBytes(); -145 private final static byte[] ACTIVE_SESSION_NO = "no".getBytes(); -146 -147 private final static String INCR_BACKUP_SET = "incrbackupset:"; -148 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; -149 private final static String RS_LOG_TS_PREFIX = "rslogts:"; -150 -151 private final static String BULK_LOAD_PREFIX = "bulk:"; -152 private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); -153 private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes(); -154 private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes(); -155 -156 final static byte[] TBL_COL = Bytes.toBytes("tbl"); -157 final static byte[] FAM_COL = Bytes.toBytes("fam"); -158 final static byte[] PATH_COL = Bytes.toBytes("path"); -159 final static byte[] STATE_COL = Bytes.toBytes("state"); -160 // the two states a bulk loaded file can be -161 final static byte[] BL_PREPARE = Bytes.toBytes("R"); -162 final static byte[] BL_COMMIT = Bytes.toBytes("D"); -163 -164 private final static String WALS_PREFIX = "wals:"; -165 private final static String SET_KEY_PREFIX = "backupset:"; -166 -167 // separator between BULK_LOAD_PREFIX and ordinals -168 protected final static String BLK_LD_DELIM = ":"; -169 private final static byte[] EMPTY_VALUE = new byte[] {}; -170 -171 // Safe delimiter in a string -172 private final static String NULL = "\u0000"; -173 -174 public BackupSystemTable(Connection conn) throws IOException { -175 this.connection = conn; -176 tableName = BackupSystemTable.getTableName(conn.getConfiguration()); -177 checkSystemTable(); -178 } -179 -180 private void checkSystemTable() throws IOException { -181 try (Admin admin = connection.getAdmin()) { -182 verifyNamespaceExists(admin); -183 -184 if (!admin.tableExists(tableName)) { -185 HTableDescriptor backupHTD = -186 BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); -187 admin.createTable(backupHTD); -188 } -189 waitForSystemTable(admin); -190 } -191 } -192 -193 private void verifyNamespaceExists(Admin admin) throws IOException { -194 String namespaceName = tableName.getNamespaceAsString(); -195 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); -196 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); -197 boolean exists = false; -198 for (NamespaceDescriptor nsd : list) { -199 if (nsd.getName().equals(ns.getName())) { -200 exists = true; -201 break; -202 } -203 } -204 if (!exists) { -205 admin.createNamespace(ns); -206 } -207 } -208 -209 private void waitForSystemTable(Admin admin) throws IOException { -210 long TIMEOUT = 60000; -211 long startTime = EnvironmentEdgeManager.currentTime(); -212 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { -213 try { -214 Thread.sleep(100); -215 } catch (InterruptedException e) { -216 } -217 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { -218 throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); -219 } -220 } -221 LOG.debug("Backup table exists and available"); -222 -223 } -224 -225 @Override -226 public void close() { -227 // do nothing -228 } -229 -230 /** -231 * Updates status (state) of a backup session in backup system table table -232 * @param info backup info -233 * @throws IOException exception -234 */ -235 public void updateBackupInfo(BackupInfo info) throws IOException { -236 -237 if (LOG.isTraceEnabled()) { -238 LOG.trace("update backup status in backup system table for: " + info.getBackupId() -239 + " set status=" + info.getState()); -240 } -241 try (Table table = connection.getTable(tableName)) { -242 Put put = createPutForBackupInfo(info); -243 table.put(put); -244 } -245 } -246 -247 /* -248 * @param backupId the backup Id -249 * @return Map of rows to path of bulk loaded hfile -250 */ -251 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { -252 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); -253 try (Table table = connection.getTable(tableName); -254 ResultScanner scanner = table.getScanner(scan)) { -255 Result res = null; -256 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); -257 while ((res = scanner.next()) != null) { -258 res.advance(); -259 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); -260 for (Cell cell : res.listCells()) { -261 if (CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, -262 BackupSystemTable.PATH_COL.length) == 0) { -263 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); -264 } -265 } -266 } -267 return map; -268 } -269 } -270 -271 /* -272 * Used during restore -273 * @param backupId the backup Id -274 * @param sTableList List of tables -275 * @return array of Map of family to List of Paths -276 */ -277 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) -278 throws IOException { -279 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); -280 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; -281 try (Table table = connection.getTable(tableName); -282 ResultScanner scanner = table.getScanner(scan)) { -283 Result res = null; -284 while ((res = scanner.next()) != null) { -285 res.advance(); -286 TableName tbl = null; -287 byte[] fam = null; -288 String path = null; -289 for (Cell cell : res.listCells()) { -290 if (CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, -291 BackupSystemTable.TBL_COL.length) == 0) { -292 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); -293 } else if (CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, -294 BackupSystemTable.FAM_COL.length) == 0) { -295 fam = CellUtil.cloneValue(cell); -296 } else if (CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, -297 BackupSystemTable.PATH_COL.length) == 0) { -298 path = Bytes.toString(CellUtil.cloneValue(cell)); -299 } -300 } -301 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); -302 if (srcIdx == -1) { -303 // the table is not among the query -304 continue; -305 } -306 if (mapForSrc[srcIdx] == null) { -307 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); -308 } -309 List<Path> files; -310 if (!mapForSrc[srcIdx].containsKey(fam)) { -311 files = new ArrayList<Path>(); -312 mapForSrc[srcIdx].put(fam, files); -313 } else { -314 files = mapForSrc[srcIdx].get(fam); -315 } -316 files.add(new Path(path)); -317 if (LOG.isDebugEnabled()) { -318 LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); -319 } -320 } -321 -322 return mapForSrc; -323 } -324 } -325 -326 /* -327 * @param map Map of row keys to path of bulk loaded hfile -328 */ -329 void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException { -330 try (Table table = connection.getTable(tableName)) { -331 List<Delete> dels = new ArrayList<>(); -332 for (byte[] row : map.keySet()) { -333 dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); -334 } -335 table.delete(dels); -336 } -337 } -338 -339 /** -340 * Deletes backup status from backup system table table -341 * @param backupId backup id -342 * @throws IOException exception -343 */ -344 -345 public void deleteBackupInfo(String backupId) throws IOException { -346 -347 if (LOG.isTraceEnabled()) { -348 LOG.trace("delete backup status in backup system table for " + backupId); -349 } -350 try (Table table = connection.getTable(tableName)) { -351 Delete del = createDeleteForBackupInfo(backupId); -352 table.delete(del); -353 } -354 } -355 -356 /* -357 * For postBulkLoadHFile() hook. -358 * @param tabName table name -359 * @param region the region receiving hfile -360 * @param finalPaths family and associated hfiles -361 */ -362 public void writePathsPostBulkLoad(TableName tabName, byte[] region, -363 Map<byte[], List<Path>> finalPaths) throws IOException { -364 if (LOG.isDebugEnabled()) { -365 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() -366 + " entries"); -367 } -368 try (Table table = connection.getTable(tableName)) { -369 List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); -370 table.put(puts); -371 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); -372 } -373 } -374 -375 /* -376 * For preCommitStoreFile() hook -377 * @param tabName table name -378 * @param region the region receiving hfile -379 * @param family column family -380 * @param pairs list of paths for hfiles -381 */ -382 public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, -383 final List<Pair<Path, Path>> pairs) throws IOException { -384 if (LOG.isDebugEnabled()) { -385 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() -386 + " entries"); -387 } -388 try (Table table = connection.getTable(tableName)) { -389 List<Put> puts = -390 BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); -391 table.put(puts); -392 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); -393 } -394 } -395 -396 /* -397 * Removes rows recording bulk loaded hfiles from backup table -398 * @param lst list of table names -399 * @param rows the rows to be deleted -400 */ -401 public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException { -402 try (Table table = connection.getTable(tableName)) { -403 List<Delete> lstDels = new ArrayList<>(); -404 for (byte[] row : rows) { -405 Delete del = new Delete(row); -406 lstDels.add(del); -407 LOG.debug("orig deleting the row: " + Bytes.toString(row)); -408 } -409 table.delete(lstDels); -410 LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); -411 } -412 } -413 -414 /* -415 * Reads the rows from backup table recording bulk loaded hfiles -416 * @param tableList list of table names -417 * @return The keys of the Map are table, region and column family. Value of the map reflects -418 * whether the hfile was recorded by preCommitStoreFile hook (true) -419 */ -420 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> -421 readBulkloadRows(List<TableName> tableList) throws IOException { -422 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); -423 List<byte[]> rows = new ArrayList<>(); -424 for (TableName tTable : tableList) { -425 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); -426 Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); -427 try (Table table = connection.getTable(tableName); -428 ResultScanner scanner = table.getScanner(scan)) { -429 Result res = null; -430 while ((res = scanner.next()) != null) { -431 res.advance(); -432 String fam = null; -433 String path = null; -434 boolean raw = false; -435 byte[] row = null; -436 String region = null; -437 for (Cell cell : res.listCells()) { -438 row = CellUtil.cloneRow(cell); -439 rows.add(row); -440 String rowStr = Bytes.toString(row); -441 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); -442 if (CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, -443 BackupSystemTable.FAM_COL.length) == 0) { -444 fam = Bytes.toString(CellUtil.cloneValue(cell)); -445 } else if (CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, -446 BackupSystemTable.PATH_COL.length) == 0) { -447 path = Bytes.toString(CellUtil.cloneValue(cell)); -448 } else if (CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, -449 BackupSystemTable.STATE_COL.length) == 0) { -450 byte[] state = CellUtil.cloneValue(cell); -451 if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { -452 raw = true; -453 } else raw = false; -454 } -455 } -456 if (map.get(tTable) == null) { -457 map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>()); -458 tblMap = map.get(tTable); -459 } -460 if (tblMap.get(region) == null) { -461 tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>()); -462 } -463 Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region); -464 if (famMap.get(fam) == null) { -465 famMap.put(fam, new ArrayList<Pair<String, Boolean>>()); -466 } -467 famMap.get(fam).add(new Pair<>(path, raw)); -468 LOG.debug("found orig " + path + " for " + fam + " of table " + region); -469 } -470 } -471 } -472 return new Pair<>(map, rows); -473 } -474 -475 /* -476 * @param sTableList List of tables -477 * @param maps array of Map of family to List of Paths -478 * @param backupId the backup Id -479 */ -480 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, -481 String backupId) throws IOException { -482 try (Table table = connection.getTable(tableName)) { -483 long ts = EnvironmentEdgeManager.currentTime(); -484 int cnt = 0; -485 List<Put> puts = new ArrayList<>(); -486 for (int idx = 0; idx < maps.length; idx++) { -487 Map<byte[], List<Path>> map = maps[idx]; -488 TableName tn = sTableList.get(idx); -489 if (map == null) continue; -490 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) { -491 byte[] fam = entry.getKey(); -492 List<Path> paths = entry.getValue(); -493 for (Path p : paths) { -494 Put put = -495 BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts, -496 cnt++); -497 puts.add(put); -498 } -499 } -500 } -501 if (!puts.isEmpty()) { -502 table.put(puts); -503 } -504 } -505 } -506 -507 /** -508 * Reads backup status object (instance of backup info) from backup system table table -509 * @param backupId backup id -510 * @return Current status of backup session or null -511 */ -512 -513 public BackupInfo readBackupInfo(String backupId) throws IOException { -514 if (LOG.isTraceEnabled()) { -515 LOG.trace("read backup status from backup system table for: " + backupId); -516 } -517 -518 try (Table table = connection.getTable(tableName)) { -519 Get get = createGetForBackupInfo(backupId); -520 Result res = table.get(get); -521 if (res.isEmpty()) { -522 return null; -523 } -524 return resultToBackupInfo(res); -525 } -526 } -527 -528 /** -529 * Read the last backup start code (timestamp) of last successful backup. Will return null if -530 * there is no start code stored on hbase or the value is of length 0. These two cases indicate -531 * there is no successful backup completed so far. -532 * @param backupRoot directory path to backup destination -533 * @return the timestamp of last successful backup -534 * @throws IOException exception -535 */ -536 public String readBackupStartCode(String backupRoot) throws IOException { -537 LOG.trace("read backup start code from backup system table"); -538 -539 try (Table table = connection.getTable(tableName)) { -540 Get get = createGetForStartCode(backupRoot); -541 Result res = table.get(get); -542 if (res.isEmpty()) { -543 return null; -544 } -545 Cell cell = res.listCells().get(0); -546 byte[] val = CellUtil.cloneValue(cell); -547 if (val.length == 0) { -548 return null; -549 } -550 return new String(val); -551 } -552 } -553 -554 /** -555 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. -556 * @param startCode start code -557 * @param backupRoot root directory path to backup -558 * @throws IOException exception -559 */ -560 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { -561 if (LOG.isTraceEnabled()) { -562 LOG.trace("write backup start code to backup system table " + startCode); -563 } -564 try (Table table = connection.getTable(tableName)) { -565 Put put = createPutForStartCode(startCode.toString(), backupRoot); -566 table.put(put); -567 } -568 } -569 -570 /** -571 * Exclusive operations are: -572 * create, delete, merge -573 * @throws IOException -574 */ -575 public void startBackupExclusiveOperation() throws IOException { -576 LOG.debug("Start new backup exclusive operation"); -577 -578 try (Table table = connection.getTable(tableName)) { -579 Put put = createPutForStartBackupSession(); -580 // First try to put if row does not exist -581 if (!table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) -582 .ifNotExists().thenPut(put)) { -583 // Row exists, try to put if value == ACTIVE_SESSION_NO -584 if (!table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) -585 .ifEquals(ACTIVE_SESSION_NO).thenPut(put)) { -586 throw new IOException("There is an active backup exclusive operation"); -587 } -588 } -589 } -590 } -591 -592 private Put createPutForStartBackupSession() { -593 Put put = new Put(ACTIVE_SESSION_ROW); -594 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); -595 return put; -596 } -597 -598 public void finishBackupExclusiveOperation() throws IOException { -599 LOG.debug("Finish backup exclusive operation"); -600 -601 try (Table table = connection.getTable(tableName)) { -602 Put put = createPutForStopBackupSession(); -603 if (!table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) -604 .ifEquals(ACTIVE_SESSION_YES).thenPut(put)) { -605 throw new IOException("There is no active backup exclusive operation"); -606 } -607 } -608 } -609 -610 private Put createPutForStopBackupSession() { -611 Put put = new Put(ACTIVE_SESSION_ROW); -612 put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); -613 return put; -614 } -615 -616 /** -617 * Get the Region Servers log information after the last log roll from backup system table. -618 * @param backupRoot root directory path to backup -619 * @return RS log info -620 * @throws IOException exception -621 */ -622 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) -623 throws IOException { -624 LOG.trace("read region server last roll log result to backup system table"); -625 -626 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); -627 -628 try (Table table = connection.getTable(tableName); -629 ResultScanner scanner = table.getScanner(scan)) { -630 Result res = null; -631 HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>(); -632 while ((res = scanner.next()) != null) { -633 res.advance(); -634 Cell cell = res.current(); -635 byte[] row = CellUtil.cloneRow(cell); -636 String server = getServerNameForReadRegionServerLastLogRollResult(row); -637 byte[] data = CellUtil.cloneValue(cell); -638 rsTimestampMap.put(server, Bytes.toLong(data)); -639 } -640 return rsTimestampMap; -641 } -642 } -643 -644 /** -645 * Writes Region Server last roll log result (timestamp) to backup system table table -646 * @param server Region Server name -647 * @param ts last log timestamp -648 * @param backupRoot root directory path to backup -649 * @throws IOException exception -650 */ -651 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) -652 throws IOException { -653 LOG.trace("write region server last roll log result to backup system table"); -654 -655 try (Table table = connection.getTable(tableName)) { -656 Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); -657 table.put(put); -658 } -659 } -660 -661 /** -662 * Get all completed backup information (in desc order by time) -663 * @param onlyCompleted true, if only successfully completed sessions -664 * @return history info of BackupCompleteData -665 * @throws IOException exception -666 */ -667 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { -668 LOG.trace("get backup history from backup system table"); -669 -670 BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; -671 ArrayList<BackupInfo> list = getBackupInfos(state); -672 return BackupUtils.sortHistoryListDesc(list); -673 } -674 -675 /** -676 * Get all backups history -677 * @return list of backup info -678 * @throws IOException -679 */ -680 public List<BackupInfo> getBackupHistory() throws IOException { -681 return getBackupHistory(false); -682 } -683 -684 /** -685 * Get first n backup history records -686 * @param n number of records, if n== -1 - max number -687 * is ignored -688 * @return list of records -689 * @throws IOException -690 */ -691 public List<BackupInfo> getHistory(int n) throws IOException { -692 List<BackupInfo> history = getBackupHistory(); -693 if (n == -1 || history.size() <= n) { -694 return history; -695 } -696 return Collections.unmodifiableList(history.subList(0, n)); -697 } -698 -699 /** -700 * Get backup history records filtered by list of filters. -701 * @param n max number of records, if n == -1 , then max number -702 * is ignored -703 * @param filters list of filters -704 * @return backup records -705 * @throws IOException -706 */ -707 public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { -708 if (filters.length == 0) return getHistory(n); -709 -710 List<BackupInfo> history = getBackupHistory(); -711 List<BackupInfo> result = new ArrayList<BackupInfo>(); -712 for (BackupInfo bi : history) { -713 if (n >= 0 && result.size() == n) break; -714 boolean passed = true; -715 for (int i = 0; i < filters.length; i++) { -716 if (!filters[i].apply(bi)) { -717 passed = false; -718 break; -719 } -720 } -721 if (passed) { -722 result.add(bi); -723 } -724 } -725 return result; -726 -727 } -728 -729 /* -730 * Retrieve TableName's for completed backup of given type -731 * @param type backup type -732 * @return List of table names -733 */ -734 public List<TableName> getTablesForBackupType(BackupType type) throws IOException { -735 Set<TableName> names = new HashSet<>(); -736 List<BackupInfo> infos = getBackupHistory(true); -737 for (BackupInfo info : infos) { -738 if (info.getType() == type) { -739 names.addAll(info.getTableNames()); -740 } -741 } -742 return new ArrayList<>(names); -743 } -744 -745 /** -746 * Get history for backup destination -747 * @param backupRoot backup destination path -748 * @return List of backup info -749 * @throws IOException -750 */ -751 public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { -752 ArrayList<BackupInfo> history = getBackupHistory(false); -753 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { -754 BackupInfo info = iterator.next(); -755 if (!backupRoot.equals(info.getBackupRootDir())) { -756 iterator.remove(); -757 } -758 } -759 return history; -760 } -761 -762 /** -763 * Get history for a table -764 * @param name table name -765 * @return history for a table -766 * @throws IOException -767 */ -768 public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { -769 List<BackupInfo> history = getBackupHistory(); -770 List<BackupInfo> tableHistory = new ArrayList<BackupInfo>(); -771 for (BackupInfo info : history) { -772 List<TableName> tables = info.getTableNames(); -773 if (tables.contains(name)) { -774 tableHistory.add(info); -775 } -776 } -777 return tableHistory; -778 } -779 -780 public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, -781 String backupRoot) throws IOException { -782 List<BackupInfo> history = getBackupHistory(backupRoot); -783 Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = -784 new HashMap<TableName, ArrayList<BackupInfo>>(); -785 for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { -786 BackupInfo info = iterator.next(); -787 if (!backupRoot.equals(info.getBackupRootDir())) { -788 continue; -789 } -790 List<TableName> tables = info.getTableNames(); -791 for (TableName tableName : tables) { -792 if (set.contains(tableName)) { -793 ArrayList<BackupInfo> list = tableHistoryMap.get(tableName); -794 if (list == null) { -795 list = new ArrayList<BackupInfo>(); -796 tableHistoryMap.put(tableName, list); -797 } -798 list.add(info); -799 } -800 } -801 } -802 return tableHistoryMap; -803 } -804 -805 /** -806 * Get all backup sessions with a given state (in descending order by time) -807 * @param state backup session state -808 * @return history info of backup info objects -809 * @throws IOException exception -810 */ -811 public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { -812 LOG.trace("get backup infos from backup system table"); -813 -814 Scan scan = createScanForBackupHistory(); -815 ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); -816 -817 try (Table table = connection.getTable(tableName); -818 ResultScanner scanner = table.getScanner(scan)) { -819 Result res = null; -820 while ((res = scanner.next()) != null) { -821 res.advance(); -822 BackupInfo context = cellToBackupInfo(res.current()); -823 if (state != BackupState.ANY && context.getState() != state) { -824 continue; -825 } -826 list.add(context); -827 } -828 return list; -829 } -830 } -831 -832 /** -833 * Write the current timestamps for each regionserver to backup system table after a successful -834 * full or incremental backup. The saved timestamp is of the last log file that was backed up -835 * already. -836 * @param tables tables -837 * @param newTimestamps timestamps -838 * @param backupRoot root directory path to backup -839 * @throws IOException exception -840 */ -841 public void writeRegionServerLogTimestamp(Set<TableName> tables, -842 HashMap<String, Long> newTimestamps, String backupRoot) throws IOException { -843 if (LOG.isTraceEnabled()) { -844 LOG.trace("write RS log time stamps to backup system table for tables [" -845 + StringUtils.join(tables, ",") + "]"); -846 } -847 List<Put> puts = new ArrayList<Put>(); -848 for (TableName table : tables) { -849 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); -850 Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); -851 puts.add(put); -852 } -853 try (Table table = connection.getTable(tableName)) { -854 table.put(puts); -855 } -856 } -857 -858 /** -859 * Read the timestamp for each region server log after the last successful backup. Each table has -860 * its own set of the timestamps. The info is stored for each table as a concatenated string of -861 * rs->timestapmp -862 * @param backupRoot root directory path to backup -863 * @return the timestamp for each region server. key: tableName value: -864 * RegionServer,PreviousTimeStamp -865 * @throws IOException exception -866 */ -867 public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot) -868 throws IOException { -869 if (LOG.isTraceEnabled()) { -870 LOG.trace("read RS log ts from backup system table for root=" + backupRoot); -871 } -872 -873 HashMap<TableName, HashMap<String, Long>> tableTimestampMap = -874 new HashMap<TableName, HashMap<String, Long>>(); -875 -876 Scan scan = createScanForReadLogTimestampMap(backupRoot); -877 try (Table table = connection.getTable(tableName); -878 ResultScanner scanner = table.getScanner(scan)) { -879 Result res = null; -880 while ((res = scanner.next()) != null) { -881 res.advance(); -882 Cell cell = res.current(); -883 byte[] row = CellUtil.cloneRow(cell); -884 String tabName = getTableNameForReadLogTimestampMap(row); -885 TableName tn = TableName.valueOf(tabName); -886 byte[] data = CellUtil.cloneValue(cell); -887 if (data == null) { -888 throw new IOException("Data of last backup data from backup system table " -889 + "is empty. Create a backup first."); -890 } -891 if (data != null && data.length > 0) { -892 HashMap<String, Long> lastBackup = -893 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); -894 tableTimestampMap.put(tn, lastBackup); -895 } -896 } -897 return tableTimestampMap; -898 } -899 } -900 -901 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, -902 Map<String, Long> map) { -903 BackupProtos.TableServerTimestamp.Builder tstBuilder = -904 BackupProtos.TableServerTimestamp.newBuilder(); -905 tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil -906 .toProtoTableName(table)); -907 -908 for (Entry<String, Long> entry : map.entrySet()) { -909 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); -910 HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); -911 ServerName sn = ServerName.parseServerName(entry.getKey()); -912 snBuilder.setHostName(sn.getHostname()); -913 snBuilder.setPort(sn.getPort()); -914 builder.setServerName(snBuilder.build()); -915 builder.setTimestamp(entry.getValue()); -916 tstBuilder.addServerTimestamp(builder.build()); -917 } -918 -919 return tstBuilder.build(); -920 } -921 -922 private HashMap<String, Long> fromTableServerTimestampProto( -923 BackupProtos.TableServerTimestamp proto) { -924 HashMap<String, Long> map = new HashMap<String, Long>(); -925 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); -926 for (BackupProtos.ServerTimestamp st : list) { -927 ServerName sn = -928 org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); -929 map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); -930 } -931 return map; -932 } -933 -934 /** -935 * Return the current tables covered by incremental backup. -936 * @param backupRoot root director