Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CDE5D200C6C for ; Thu, 20 Apr 2017 17:01:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CC467160BB9; Thu, 20 Apr 2017 15:01:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 78C98160BB7 for ; Thu, 20 Apr 2017 17:01:39 +0200 (CEST) Received: (qmail 72722 invoked by uid 500); 20 Apr 2017 15:01:33 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 69021 invoked by uid 99); 20 Apr 2017 15:01:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 15:01:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 96189F4A41; Thu, 20 Apr 2017 15:01:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 20 Apr 2017 15:02:00 -0000 Message-Id: In-Reply-To: <4a35df37c6d74f6186c8aef0e7b24bdc@git.apache.org> References: <4a35df37c6d74f6186c8aef0e7b24bdc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Thu, 20 Apr 2017 15:01:41 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/662ea7dc/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html b/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html index 1b25783..2ccefa4 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.html @@ -50,1725 +50,1744 @@ 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.HColumnDescriptor; 044import org.apache.hadoop.hbase.HTableDescriptor; -045import org.apache.hadoop.hbase.ServerName; -046import org.apache.hadoop.hbase.TableName; -047import org.apache.hadoop.hbase.backup.BackupInfo; -048import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; -049import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -050import org.apache.hadoop.hbase.backup.BackupType; -051import org.apache.hadoop.hbase.backup.util.BackupUtils; -052import org.apache.hadoop.hbase.classification.InterfaceAudience; -053import org.apache.hadoop.hbase.client.Admin; -054import org.apache.hadoop.hbase.client.Connection; -055import org.apache.hadoop.hbase.client.Delete; -056import org.apache.hadoop.hbase.client.Get; -057import org.apache.hadoop.hbase.client.Put; -058import org.apache.hadoop.hbase.client.Result; -059import org.apache.hadoop.hbase.client.ResultScanner; -060import org.apache.hadoop.hbase.client.Scan; -061import org.apache.hadoop.hbase.client.Table; -062import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; -063import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -064import org.apache.hadoop.hbase.util.Bytes; -065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -066import org.apache.hadoop.hbase.util.Pair; -067 -068/** -069 * This class provides API to access backup system table<br> -070 * -071 * Backup system table schema:<br> -072 * <p><ul> -073 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> -074 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> -075 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> -076 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; -077 * value = map[RS-> last WAL timestamp]</li> -078 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> -079 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; -080 * value = backupId and full WAL file name</li> -081 * </ul></p> -082 */ -083@InterfaceAudience.Private -084public final class BackupSystemTable implements Closeable { -085 private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); -086 -087 static class WALItem { -088 String backupId; -089 String walFile; -090 String backupRoot; -091 -092 WALItem(String backupId, String walFile, String backupRoot) { -093 this.backupId = backupId; -094 this.walFile = walFile; -095 this.backupRoot = backupRoot; -096 } -097 -098 public String getBackupId() { -099 return backupId; -100 } -101 -102 public String getWalFile() { -103 return walFile; -104 } -105 -106 public String getBackupRoot() { -107 return backupRoot; -108 } -109 -110 @Override -111 public String toString() { -112 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; -113 } -114 -115 } -116 -117 private TableName tableName; -118 /** -119 * Stores backup sessions (contexts) -120 */ -121 final static byte[] SESSIONS_FAMILY = "session".getBytes(); -122 /** -123 * Stores other meta -124 */ -125 final static byte[] META_FAMILY = "meta".getBytes(); -126 final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); -127 /** -128 * Connection to HBase cluster, shared among all instances -129 */ -130 private final Connection connection; -131 +045import org.apache.hadoop.hbase.NamespaceDescriptor; +046import org.apache.hadoop.hbase.ServerName; +047import org.apache.hadoop.hbase.TableName; +048import org.apache.hadoop.hbase.backup.BackupInfo; +049import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +050import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +051import org.apache.hadoop.hbase.backup.BackupType; +052import org.apache.hadoop.hbase.backup.util.BackupUtils; +053import org.apache.hadoop.hbase.classification.InterfaceAudience; +054import org.apache.hadoop.hbase.client.Admin; +055import org.apache.hadoop.hbase.client.Connection; +056import org.apache.hadoop.hbase.client.Delete; +057import org.apache.hadoop.hbase.client.Get; +058import org.apache.hadoop.hbase.client.Put; +059import org.apache.hadoop.hbase.client.Result; +060import org.apache.hadoop.hbase.client.ResultScanner; +061import org.apache.hadoop.hbase.client.Scan; +062import org.apache.hadoop.hbase.client.Table; +063import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; +064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +065import org.apache.hadoop.hbase.util.Bytes; +066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +067import org.apache.hadoop.hbase.util.Pair; +068 +069/** +070 * This class provides API to access backup system table<br> +071 * +072 * Backup system table schema:<br> +073 * <p><ul> +074 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> +075 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> +076 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> +077 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; +078 * value = map[RS-> last WAL timestamp]</li> +079 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> +080 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; +081 * value = backupId and full WAL file name</li> +082 * </ul></p> +083 */ +084@InterfaceAudience.Private +085public final class BackupSystemTable implements Closeable { +086 private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); +087 +088 static class WALItem { +089 String backupId; +090 String walFile; +091 String backupRoot; +092 +093 WALItem(String backupId, String walFile, String backupRoot) { +094 this.backupId = backupId; +095 this.walFile = walFile; +096 this.backupRoot = backupRoot; +097 } +098 +099 public String getBackupId() { +100 return backupId; +101 } +102 +103 public String getWalFile() { +104 return walFile; +105 } +106 +107 public String getBackupRoot() { +108 return backupRoot; +109 } +110 +111 @Override +112 public String toString() { +113 return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; +114 } +115 +116 } +117 +118 private TableName tableName; +119 /** +120 * Stores backup sessions (contexts) +121 */ +122 final static byte[] SESSIONS_FAMILY = "session".getBytes(); +123 /** +124 * Stores other meta +125 */ +126 final static byte[] META_FAMILY = "meta".getBytes(); +127 final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); +128 /** +129 * Connection to HBase cluster, shared among all instances +130 */ +131 private final Connection connection; 132 -133 private final static String BACKUP_INFO_PREFIX = "session:"; -134 private final static String START_CODE_ROW = "startcode:"; -135 private final static String INCR_BACKUP_SET = "incrbackupset:"; -136 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; -137 private final static String RS_LOG_TS_PREFIX = "rslogts:"; -138 -139 private final static String BULK_LOAD_PREFIX = "bulk:"; -140 private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); -141 final static byte[] TBL_COL = Bytes.toBytes("tbl"); -142 final static byte[] FAM_COL = Bytes.toBytes("fam"); -143 final static byte[] PATH_COL = Bytes.toBytes("path"); -144 final static byte[] STATE_COL = Bytes.toBytes("state"); -145 // the two states a bulk loaded file can be -146 final static byte[] BL_PREPARE = Bytes.toBytes("R"); -147 final static byte[] BL_COMMIT = Bytes.toBytes("D"); -148 -149 private final static String WALS_PREFIX = "wals:"; -150 private final static String SET_KEY_PREFIX = "backupset:"; -151 -152 // separator between BULK_LOAD_PREFIX and ordinals -153 protected final static String BLK_LD_DELIM = ":"; -154 private final static byte[] EMPTY_VALUE = new byte[] {}; -155 -156 // Safe delimiter in a string -157 private final static String NULL = "\u0000"; -158 -159 public BackupSystemTable(Connection conn) throws IOException { -160 this.connection = conn; -161 tableName = BackupSystemTable.getTableName(conn.getConfiguration()); -162 checkSystemTable(); -163 } -164 -165 private void checkSystemTable() throws IOException { -166 try (Admin admin = connection.getAdmin();) { -167 -168 if (!admin.tableExists(tableName)) { -169 HTableDescriptor backupHTD = -170 BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); -171 admin.createTable(backupHTD); -172 } -173 waitForSystemTable(admin); -174 } -175 } -176 -177 private void waitForSystemTable(Admin admin) throws IOException { -178 long TIMEOUT = 60000; -179 long startTime = EnvironmentEdgeManager.currentTime(); -180 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { -181 try { -182 Thread.sleep(100); -183 } catch (InterruptedException e) { -184 } -185 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { -186 throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms"); -187 } -188 } -189 LOG.debug("Backup table exists and available"); -190 -191 } -192 -193 -194 -195 @Override -196 public void close() { -197 // do nothing -198 } -199 -200 /** -201 * Updates status (state) of a backup session in backup system table table -202 * @param info backup info -203 * @throws IOException exception -204 */ -205 public void updateBackupInfo(BackupInfo info) throws IOException { -206 -207 if (LOG.isTraceEnabled()) { -208 LOG.trace("update backup status in backup system table for: " + info.getBackupId() -209 + " set status=" + info.getState()); -210 } -211 try (Table table = connection.getTable(tableName)) { -212 Put put = createPutForBackupInfo(info); -213 table.put(put); -214 } -215 } -216 -217 /* -218 * @param backupId the backup Id -219 * @return Map of rows to path of bulk loaded hfile -220 */ -221 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { -222 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); -223 try (Table table = connection.getTable(tableName); -224 ResultScanner scanner = table.getScanner(scan)) { -225 Result res = null; -226 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); -227 while ((res = scanner.next()) != null) { -228 res.advance(); -229 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); -230 for (Cell cell : res.listCells()) { -231 if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, -232 BackupSystemTable.PATH_COL.length) == 0) { -233 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); -234 } -235 } -236 } -237 return map; -238 } -239 } -240 -241 /* -242 * Used during restore -243 * @param backupId the backup Id -244 * @param sTableList List of tables -245 * @return array of Map of family to List of Paths -246 */ -247 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) -248 throws IOException { -249 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); -250 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; -251 try (Table table = connection.getTable(tableName); -252 ResultScanner scanner = table.getScanner(scan)) { -253 Result res = null; -254 while ((res = scanner.next()) != null) { -255 res.advance(); -256 TableName tbl = null; -257 byte[] fam = null; -258 String path = null; -259 for (Cell cell : res.listCells()) { -260 if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, -261 BackupSystemTable.TBL_COL.length) == 0) { -262 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); -263 } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, -264 BackupSystemTable.FAM_COL.length) == 0) { -265 fam = CellUtil.cloneValue(cell); -266 } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, -267 BackupSystemTable.PATH_COL.length) == 0) { -268 path = Bytes.toString(CellUtil.cloneValue(cell)); -269 } -270 } -271 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); -272 if (srcIdx == -1) { -273 // the table is not among the query -274 continue; -275 } -276 if (mapForSrc[srcIdx] == null) { -277 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); -278 } -279 List<Path> files; -280 if (!mapForSrc[srcIdx].containsKey(fam)) { -281 files = new ArrayList<Path>(); -282 mapForSrc[srcIdx].put(fam, files); -283 } else { -284 files = mapForSrc[srcIdx].get(fam); -285 } -286 files.add(new Path(path)); -287 if (LOG.isDebugEnabled()) { -288 LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); +133 +134 private final static String BACKUP_INFO_PREFIX = "session:"; +135 private final static String START_CODE_ROW = "startcode:"; +136 private final static String INCR_BACKUP_SET = "incrbackupset:"; +137 private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; +138 private final static String RS_LOG_TS_PREFIX = "rslogts:"; +139 +140 private final static String BULK_LOAD_PREFIX = "bulk:"; +141 private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); +142 final static byte[] TBL_COL = Bytes.toBytes("tbl"); +143 final static byte[] FAM_COL = Bytes.toBytes("fam"); +144 final static byte[] PATH_COL = Bytes.toBytes("path"); +145 final static byte[] STATE_COL = Bytes.toBytes("state"); +146 // the two states a bulk loaded file can be +147 final static byte[] BL_PREPARE = Bytes.toBytes("R"); +148 final static byte[] BL_COMMIT = Bytes.toBytes("D"); +149 +150 private final static String WALS_PREFIX = "wals:"; +151 private final static String SET_KEY_PREFIX = "backupset:"; +152 +153 // separator between BULK_LOAD_PREFIX and ordinals +154 protected final static String BLK_LD_DELIM = ":"; +155 private final static byte[] EMPTY_VALUE = new byte[] {}; +156 +157 // Safe delimiter in a string +158 private final static String NULL = "\u0000"; +159 +160 public BackupSystemTable(Connection conn) throws IOException { +161 this.connection = conn; +162 tableName = BackupSystemTable.getTableName(conn.getConfiguration()); +163 checkSystemTable(); +164 } +165 +166 private void checkSystemTable() throws IOException { +167 try (Admin admin = connection.getAdmin();) { +168 +169 verifyNamespaceExists(admin); +170 +171 if (!admin.tableExists(tableName)) { +172 HTableDescriptor backupHTD = +173 BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); +174 admin.createTable(backupHTD); +175 } +176 waitForSystemTable(admin); +177 } +178 } +179 +180 private void verifyNamespaceExists(Admin admin) throws IOException { +181 String namespaceName = tableName.getNamespaceAsString(); +182 NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); +183 NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); +184 boolean exists = false; +185 for( NamespaceDescriptor nsd: list) { +186 if (nsd.getName().equals(ns.getName())) { +187 exists = true; +188 break; +189 } +190 } +191 if (!exists) { +192 admin.createNamespace(ns); +193 } +194 } +195 +196 private void waitForSystemTable(Admin admin) throws IOException { +197 long TIMEOUT = 60000; +198 long startTime = EnvironmentEdgeManager.currentTime(); +199 while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { +200 try { +201 Thread.sleep(100); +202 } catch (InterruptedException e) { +203 } +204 if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { +205 throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms"); +206 } +207 } +208 LOG.debug("Backup table exists and available"); +209 +210 } +211 +212 +213 +214 @Override +215 public void close() { +216 // do nothing +217 } +218 +219 /** +220 * Updates status (state) of a backup session in backup system table table +221 * @param info backup info +222 * @throws IOException exception +223 */ +224 public void updateBackupInfo(BackupInfo info) throws IOException { +225 +226 if (LOG.isTraceEnabled()) { +227 LOG.trace("update backup status in backup system table for: " + info.getBackupId() +228 + " set status=" + info.getState()); +229 } +230 try (Table table = connection.getTable(tableName)) { +231 Put put = createPutForBackupInfo(info); +232 table.put(put); +233 } +234 } +235 +236 /* +237 * @param backupId the backup Id +238 * @return Map of rows to path of bulk loaded hfile +239 */ +240 Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException { +241 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); +242 try (Table table = connection.getTable(tableName); +243 ResultScanner scanner = table.getScanner(scan)) { +244 Result res = null; +245 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); +246 while ((res = scanner.next()) != null) { +247 res.advance(); +248 byte[] row = CellUtil.cloneRow(res.listCells().get(0)); +249 for (Cell cell : res.listCells()) { +250 if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, +251 BackupSystemTable.PATH_COL.length) == 0) { +252 map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); +253 } +254 } +255 } +256 return map; +257 } +258 } +259 +260 /* +261 * Used during restore +262 * @param backupId the backup Id +263 * @param sTableList List of tables +264 * @return array of Map of family to List of Paths +265 */ +266 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList) +267 throws IOException { +268 Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); +269 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; +270 try (Table table = connection.getTable(tableName); +271 ResultScanner scanner = table.getScanner(scan)) { +272 Result res = null; +273 while ((res = scanner.next()) != null) { +274 res.advance(); +275 TableName tbl = null; +276 byte[] fam = null; +277 String path = null; +278 for (Cell cell : res.listCells()) { +279 if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, +280 BackupSystemTable.TBL_COL.length) == 0) { +281 tbl = TableName.valueOf(CellUtil.cloneValue(cell)); +282 } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, +283 BackupSystemTable.FAM_COL.length) == 0) { +284 fam = CellUtil.cloneValue(cell); +285 } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, +286 BackupSystemTable.PATH_COL.length) == 0) { +287 path = Bytes.toString(CellUtil.cloneValue(cell)); +288 } 289 } -290 }; -291 return mapForSrc; -292 } -293 } -294 -295 /* -296 * @param map Map of row keys to path of bulk loaded hfile -297 */ -298 void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException { -299 try (Table table = connection.getTable(tableName)) { -300 List<Delete> dels = new ArrayList<>(); -301 for (byte[] row : map.keySet()) { -302 dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); -303 } -304 table.delete(dels); -305 } -306 } -307 -308 /** -309 * Deletes backup status from backup system table table -310 * @param backupId backup id -311 * @throws IOException exception -312 */ +290 int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); +291 if (srcIdx == -1) { +292 // the table is not among the query +293 continue; +294 } +295 if (mapForSrc[srcIdx] == null) { +296 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); +297 } +298 List<Path> files; +299 if (!mapForSrc[srcIdx].containsKey(fam)) { +300 files = new ArrayList<Path>(); +301 mapForSrc[srcIdx].put(fam, files); +302 } else { +303 files = mapForSrc[srcIdx].get(fam); +304 } +305 files.add(new Path(path)); +306 if (LOG.isDebugEnabled()) { +307 LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); +308 } +309 }; +310 return mapForSrc; +311 } +312 } 313 -314 public void deleteBackupInfo(String backupId) throws IOException { -315 -316 if (LOG.isTraceEnabled()) { -317 LOG.trace("delete backup status in backup system table for " + backupId); -318 } -319 try (Table table = connection.getTable(tableName)) { -320 Delete del = createDeleteForBackupInfo(backupId); -321 table.delete(del); -322 } -323 } -324 -325 /* -326 * For postBulkLoadHFile() hook. -327 * @param tabName table name -328 * @param region the region receiving hfile -329 * @param finalPaths family and associated hfiles -330 */ -331 public void writePathsPostBulkLoad(TableName tabName, byte[] region, -332 Map<byte[], List<Path>> finalPaths) throws IOException { -333 if (LOG.isDebugEnabled()) { -334 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + -335 finalPaths.size() + " entries"); -336 } -337 try (Table table = connection.getTable(tableName)) { -338 List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, -339 finalPaths); -340 table.put(puts); -341 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); -342 } -343 } +314 /* +315 * @param map Map of row keys to path of bulk loaded hfile +316 */ +317 void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException { +318 try (Table table = connection.getTable(tableName)) { +319 List<Delete> dels = new ArrayList<>(); +320 for (byte[] row : map.keySet()) { +321 dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); +322 } +323 table.delete(dels); +324 } +325 } +326 +327 /** +328 * Deletes backup status from backup system table table +329 * @param backupId backup id +330 * @throws IOException exception +331 */ +332 +333 public void deleteBackupInfo(String backupId) throws IOException { +334 +335 if (LOG.isTraceEnabled()) { +336 LOG.trace("delete backup status in backup system table for " + backupId); +337 } +338 try (Table table = connection.getTable(tableName)) { +339 Delete del = createDeleteForBackupInfo(backupId); +340 table.delete(del); +341 } +342 } +343 344 /* -345 * For preCommitStoreFile() hook +345 * For postBulkLoadHFile() hook. 346 * @param tabName table name 347 * @param region the region receiving hfile -348 * @param family column family -349 * @param pairs list of paths for hfiles -350 */ -351 public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, -352 final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException { -353 if (LOG.isDebugEnabled()) { -354 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + -355 pairs.size() + " entries"); -356 } -357 try (Table table = connection.getTable(tableName)) { -358 List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region, -359 family, pairs); -360 table.put(puts); -361 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); -362 } -363 } -364 -365 /* -366 * Removes rows recording bulk loaded hfiles from backup table -367 * @param lst list of table names -368 * @param rows the rows to be deleted +348 * @param finalPaths family and associated hfiles +349 */ +350 public void writePathsPostBulkLoad(TableName tabName, byte[] region, +351 Map<byte[], List<Path>> finalPaths) throws IOException { +352 if (LOG.isDebugEnabled()) { +353 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + +354 finalPaths.size() + " entries"); +355 } +356 try (Table table = connection.getTable(tableName)) { +357 List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, +358 finalPaths); +359 table.put(puts); +360 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); +361 } +362 } +363 /* +364 * For preCommitStoreFile() hook +365 * @param tabName table name +366 * @param region the region receiving hfile +367 * @param family column family +368 * @param pairs list of paths for hfiles 369 */ -370 public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException { -371 try (Table table = connection.getTable(tableName)) { -372 List<Delete> lstDels = new ArrayList<>(); -373 for (byte[] row : rows) { -374 Delete del = new Delete(row); -375 lstDels.add(del); -376 LOG.debug("orig deleting the row: " + Bytes.toString(row)); -377 } -378 table.delete(lstDels); -379 LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); -380 } -381 } -382 -383 /* -384 * Reads the rows from backup table recording bulk loaded hfiles -385 * @param tableList list of table names -386 * @return The keys of the Map are table, region and column family. -387 * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true) +370 public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, +371 final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException { +372 if (LOG.isDebugEnabled()) { +373 LOG.debug("write bulk load descriptor to backup " + tabName + " with " + +374 pairs.size() + " entries"); +375 } +376 try (Table table = connection.getTable(tableName)) { +377 List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region, +378 family, pairs); +379 table.put(puts); +380 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); +381 } +382 } +383 +384 /* +385 * Removes rows recording bulk loaded hfiles from backup table +386 * @param lst list of table names +387 * @param rows the rows to be deleted 388 */ -389 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> -390 readBulkloadRows(List<TableName> tableList) throws IOException { -391 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); -392 List<byte[]> rows = new ArrayList<>(); -393 for (TableName tTable : tableList) { -394 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); -395 Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); -396 try (Table table = connection.getTable(tableName); -397 ResultScanner scanner = table.getScanner(scan)) { -398 Result res = null; -399 while ((res = scanner.next()) != null) { -400 res.advance(); -401 String fam = null; -402 String path = null; -403 boolean raw = false; -404 byte[] row = null; -405 String region = null; -406 for (Cell cell : res.listCells()) { -407 row = CellUtil.cloneRow(cell); -408 rows.add(row); -409 String rowStr = Bytes.toString(row); -410 region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); -411 if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, -412 BackupSystemTable.FAM_COL.length) == 0) { -413 fam = Bytes.toString(CellUtil.cloneValue(cell)); -414 } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, -415 BackupSystemTable.PATH_COL.length) == 0) { -416 path = Bytes.toString(CellUtil.cloneValue(cell)); -417 } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, -418 BackupSystemTable.STATE_COL.length) == 0) { -419 byte[] state = CellUtil.cloneValue(cell); -420 if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { -421 raw = true; -422 } else raw = false; -423 } -424 } -425 if (map.get(tTable) == null) { -426 map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>()); -427 tblMap = map.get(tTable); -428 } -429 if (tblMap.get(region) == null) { -430 tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>()); -431 } -432 Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region); -433 if (famMap.get(fam) == null) { -434 famMap.put(fam, new ArrayList<Pair<String, Boolean>>()); -435 } -436 famMap.get(fam).add(new Pair<>(path, raw)); -437 LOG.debug("found orig " + path + " for " + fam + " of table " + region); -438 } -439 } -440 } -441 return new Pair<>(map, rows); -442 } -443 -444 /* -445 * @param sTableList List of tables -446 * @param maps array of Map of family to List of Paths -447 * @param backupId the backup Id -448 */ -449 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps, -450 String backupId) throws IOException { -451 try (Table table = connection.getTable(tableName)) { -452 long ts = EnvironmentEdgeManager.currentTime(); -453 int cnt = 0; -454 List<Put> puts = new ArrayList<>(); -455 for (int idx = 0; idx < maps.length; idx++) { -456 Map<byte[], List<Path>> map = maps[idx]; -457 TableName tn = sTableList.get(idx); -458 if (map == null) continue; -459 for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) { -460 byte[] fam = entry.getKey(); -461 List<Path> paths = entry.getValue(); -462 for (Path p : paths) { -463 Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), -464 backupId, ts, cnt++); -465 puts.add(put); -466 } -467 } -468 } -469 if (!puts.isEmpty()) { -470 table.put(puts); -471 } -472 } -473 } -474 -475 /** -476 * Reads backup status object (instance of backup info) from backup system table table -477 * @param backupId backup id -478 * @return Current status of backup session or null -479 */ -480 -481 public BackupInfo readBackupInfo(String backupId) throws IOException { -482 if (LOG.isTraceEnabled()) { -483 LOG.trace("read backup status from backup system table for: " + backupId); -484 } -485 -486 try (Table table = connection.getTable(tableName)) { -487 Get get = createGetForBackupInfo(backupId); -488 Result res = table.get(get); -489 if (res.isEmpty()) { -490 return null; -491 } -492 return resultToBackupInfo(res); -493 } -494 } -495 -496 /** -497 * Read the last backup start code (timestamp) of last successful backup. Will return null if -498 * there is no start code stored on hbase or the value is of length 0. These two cases indicate -499 * there is no successful backup completed so far. -500 * @param backupRoot directory path to backup destination -501 * @return the timestamp of last successful backup -502 * @throws IOException exception -503 */ -504 public String readBackupStartCode(String backupRoot) throws IOException { -505 if (LOG.isTraceEnabled()) { -506 LOG.trace("read backup start code from backup system table"); -507 } -508 try (Table table = connection.getTable(tableName)) { -509 Get get = createGetForStartCode(backupRoot); -510 Result res = table.get(get); -511 if (res.isEmpty()) { -512 return null; -513 } -514 Cell cell = res.listCells().get(0); -515 byte[] val = CellUtil.cloneValue(cell); -516 if (val.length == 0) { -517 return null; -518 } -519 return new String(val); -520 } -521 } -522 -523 /** -524 * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. -525 * @param startCode start code -526 * @param backupRoot root directory path to backup -527 * @throws IOException exception -528 */ -529 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { -530 if (LOG.isTraceEnabled()) { -531 LOG.trace("write backup start code to backup system table " + startCode); -532 } -533 try (Table table = connection.getTable(tableName)) { -534 Put put = createPutForStartCode(startCode.toString(), backupRoot); -535 table.put(put); -536 } -537 } -538 -539 /** -540 * Get the Region Servers log information after the last log roll from backup system table. -541 * @param backupRoot root directory path to backup -542 * @return RS log info -543 * @throws IOException exception -544 */ -545 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) -546 throws IOException { -547 if (LOG.isTraceEnabled()) { -548 LOG.trace("read region server last roll log result to backup system table"); -549 } -550 -551 Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); -552 -553 try (Table table = connection.getTable(tableName); -554 ResultScanner scanner = table.getScanner(scan)) { -555 Result res = null; -556 HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>(); -557 while ((res = scanner.next()) != null) { -558 res.advance(); -559 Cell cell = res.current(); -560 byte[] row = CellUtil.cloneRow(cell); -561 String server = -562 getServerNameForReadRegionServerLastLogRollResult(row); -563 byte[] data = CellUtil.cloneValue(cell); -564 rsTimestampMap.put(server, Bytes.toLong(data)); -565 } -566 return rsTimestampMap; -567 } -568 } +389 public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException { +390 try (Table table = connection.getTable(tableName)) { +391 List<Delete> lstDels = new ArrayList<>(); +392 for (byte[] row : rows) { +393 Delete del = new Delete(row); +394 lstDels.add(del); +395 LOG.debug("orig deleting the row: " + Bytes.toString(row)); +396 } +397 table.delete(lstDels); +398 LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); +399 } +400 } +401 +402 /* +403 * Reads the rows from backup table recording bulk loaded hfiles +404 * @param tableList list of table names +405 * @return The keys of the Map are table, region and column family. +406 * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true) +407 */ +408 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> +409 readBulkloadRows(List<TableName> tableList) throws IOException { +410 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); +411 List<byte[]> rows = new ArrayList<>(); +412 for (TableName tTable : tableList) { +413 Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); +414 Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable); +415 try (Table table = connection.getTable(tableName); +416 ResultScanner scanner = table.getScanner(scan)) { +417 Result res = null; +418 while ((res = scanner.next()) != null) { +419 res.advance(); +420 String fam = null; +421 String path = null; +422