Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0DE32200C57 for ; Sat, 11 Mar 2017 00:37:51 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0C577160B82; Fri, 10 Mar 2017 23:37:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BC01D160B8C for ; Sat, 11 Mar 2017 00:37:48 +0100 (CET) Received: (qmail 2834 invoked by uid 500); 10 Mar 2017 23:37:47 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 2423 invoked by uid 99); 10 Mar 2017 23:37:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Mar 2017 23:37:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33252DFF46; Fri, 10 Mar 2017 23:37:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Date: Fri, 10 Mar 2017 23:37:50 -0000 Message-Id: <04bd34fa814544959dee623bc7a92da6@git.apache.org> In-Reply-To: <4d2b8cd58e3c4768b1f027c4e54b5ec1@git.apache.org> References: <4d2b8cd58e3c4768b1f027c4e54b5ec1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/10] hbase git commit: HBASE-14123 HBase Backup/Restore Phase 2 (Vladimir Rodionov) archived-at: Fri, 10 Mar 2017 23:37:51 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java new file mode 100644 index 0000000..47e428c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.MetricsMaster; +import org.apache.hadoop.hbase.procedure.MasterProcedureManager; +import org.apache.hadoop.hbase.procedure.Procedure; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.zookeeper.KeeperException; + +/** + * Master procedure manager for coordinated cluster-wide WAL roll operation, which is run during + * backup operation, see {@link MasterProcedureManager} and and {@link RegionServerProcedureManager} + */ +@InterfaceAudience.Private +public class LogRollMasterProcedureManager extends MasterProcedureManager { + + public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; + public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; + private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); + + private MasterServices master; + private ProcedureCoordinator coordinator; + private boolean done; + + @Override + public void stop(String why) { + LOG.info("stop: " + why); + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void initialize(MasterServices master, MetricsMaster metricsMaster) + throws KeeperException, IOException, UnsupportedOperationException { + this.master = master; + this.done = false; + + // setup the default procedure coordinator + String name = master.getServerName().toString(); + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory + .getCoordinatedStateManager(master.getConfiguration()); + coordManager.initialize(master); + + ProcedureCoordinatorRpcs comms = + coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); + + this.coordinator = new ProcedureCoordinator(comms, tpool); + } + + @Override + public String getProcedureSignature() { + return ROLLLOG_PROCEDURE_SIGNATURE; + } + + @Override + public void execProcedure(ProcedureDescription desc) throws IOException { + if (!isBackupEnabled()) { + LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + this.done = false; + // start the process on the RS + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); + List serverNames = master.getServerManager().getOnlineServersList(); + List servers = new ArrayList(); + for (ServerName sn : serverNames) { + servers.add(sn.toString()); + } + + List conf = desc.getConfigurationList(); + byte[] data = new byte[0]; + if (conf.size() > 0) { + // Get backup root path + data = conf.get(0).getValue().getBytes(); + } + Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers); + if (proc == null) { + String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'"; + LOG.error(msg); + throw new IOException(msg); + } + + try { + // wait for the procedure to complete. A timer thread is kicked off that should cancel this + // if it takes too long. + proc.waitForCompleted(); + LOG.info("Done waiting - exec procedure for " + desc.getInstance()); + LOG.info("Distributed roll log procedure is successful!"); + this.done = true; + } catch (InterruptedException e) { + ForeignException ee = + new ForeignException("Interrupted while waiting for roll log procdure to finish", e); + monitor.receive(ee); + Thread.currentThread().interrupt(); + } catch (ForeignException e) { + ForeignException ee = + new ForeignException("Exception while waiting for roll log procdure to finish", e); + monitor.receive(ee); + } + monitor.rethrowException(); + } + + private boolean isBackupEnabled() { + return BackupManager.isBackupEnabled(master.getConfiguration()); + } + + @Override + public boolean isProcedureDone(ProcedureDescription desc) throws IOException { + return done; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java new file mode 100644 index 0000000..8fc644c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; + +/** + * This backup sub-procedure implementation forces a WAL rolling on a RS. + */ +@InterfaceAudience.Private +public class LogRollBackupSubprocedure extends Subprocedure { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class); + + private final RegionServerServices rss; + private final LogRollBackupSubprocedurePool taskManager; + private FSHLog hlog; + private String backupRoot; + + public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member, + ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, + LogRollBackupSubprocedurePool taskManager, byte[] data) { + + super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener, + wakeFrequency, timeout); + LOG.info("Constructing a LogRollBackupSubprocedure."); + this.rss = rss; + this.taskManager = taskManager; + if (data != null) { + backupRoot = new String(data); + } + } + + /** + * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified + * with no use of subprocedurepool. + */ + class RSRollLogTask implements Callable { + RSRollLogTask() { + } + + @Override + public Void call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("++ DRPC started: " + rss.getServerName()); + } + hlog = (FSHLog) rss.getWAL(null); + long filenum = hlog.getFilenum(); + List wals = rss.getWALs(); + long highest = -1; + for (WAL wal : wals) { + if (wal == null) continue; + if (((AbstractFSWAL) wal).getFilenum() > highest) { + highest = ((AbstractFSWAL) wal).getFilenum(); + } + } + + LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum + + " highest: " + highest + " on " + rss.getServerName()); + ((HRegionServer) rss).getWalRoller().requestRollAll(); + long start = EnvironmentEdgeManager.currentTime(); + while (!((HRegionServer) rss).getWalRoller().walRollFinished()) { + Thread.sleep(20); + } + LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start)); + LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum() + + " on " + rss.getServerName()); + + Connection connection = rss.getConnection(); + try (final BackupSystemTable table = new BackupSystemTable(connection)) { + // sanity check, good for testing + HashMap serverTimestampMap = + table.readRegionServerLastLogRollResult(backupRoot); + String host = rss.getServerName().getHostname(); + int port = rss.getServerName().getPort(); + String server = host + ":" + port; + Long sts = serverTimestampMap.get(host); + if (sts != null && sts > highest) { + LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + highest); + return null; + } + // write the log number to backup system table. + table.writeRegionServerLastLogRollResult(server, highest, backupRoot); + return null; + } catch (Exception e) { + LOG.error(e); + throw e; + } + } + } + + private void rolllog() throws ForeignException { + monitor.rethrowException(); + + taskManager.submitTask(new RSRollLogTask()); + monitor.rethrowException(); + + // wait for everything to complete. + taskManager.waitForOutstandingTasks(); + monitor.rethrowException(); + + } + + @Override + public void acquireBarrier() throws ForeignException { + // do nothing, executing in inside barrier step. + } + + /** + * do a log roll. + * @return some bytes + */ + @Override + public byte[] insideBarrier() throws ForeignException { + rolllog(); + return null; + } + + /** + * Cancel threads if they haven't finished. + */ + @Override + public void cleanup(Exception e) { + taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e); + } + + /** + * Hooray! + */ + public void releaseBarrier() { + // NO OP + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java new file mode 100644 index 0000000..65a1fa3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.errorhandling.ForeignException; + +/** + * Handle running each of the individual tasks for completing a backup procedure on a region + * server. + */ +@InterfaceAudience.Private +public class LogRollBackupSubprocedurePool implements Closeable, Abortable { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class); + + /** Maximum number of concurrent snapshot region tasks that can run concurrently */ + private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks"; + private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3; + + private final ExecutorCompletionService taskPool; + private final ThreadPoolExecutor executor; + private volatile boolean aborted; + private final List> futures = new ArrayList>(); + private final String name; + + public LogRollBackupSubprocedurePool(String name, Configuration conf) { + // configure the executor service + long keepAlive = + conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY, + LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); + int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); + this.name = name; + executor = + new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, + new LinkedBlockingQueue(), new DaemonThreadFactory("rs(" + name + + ")-backup-pool")); + taskPool = new ExecutorCompletionService(executor); + } + + /** + * Submit a task to the pool. + */ + public void submitTask(final Callable task) { + Future f = this.taskPool.submit(task); + futures.add(f); + } + + /** + * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} + * @return true on success, false otherwise + * @throws ForeignException exception + */ + public boolean waitForOutstandingTasks() throws ForeignException { + LOG.debug("Waiting for backup procedure to finish."); + + try { + for (Future f : futures) { + f.get(); + } + return true; + } catch (InterruptedException e) { + if (aborted) { + throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", + e); + } + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + if (e.getCause() instanceof ForeignException) { + throw (ForeignException) e.getCause(); + } + throw new ForeignException(name, e.getCause()); + } finally { + // close off remaining tasks + for (Future f : futures) { + if (!f.isDone()) { + f.cancel(true); + } + } + } + return false; + } + + /** + * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly + * finish + */ + @Override + public void close() { + executor.shutdown(); + } + + @Override + public void abort(String why, Throwable e) { + if (this.aborted) { + return; + } + + this.aborted = true; + LOG.warn("Aborting because: " + why, e); + this.executor.shutdownNow(); + } + + @Override + public boolean isAborted() { + return this.aborted; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java new file mode 100644 index 0000000..9d5a858 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.io.IOException; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.procedure.SubprocedureFactory; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.zookeeper.KeeperException; + +/** + * This manager class handles the work dealing with distributed WAL roll request. + *

+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is + * responsible by this region server. If any failures occur with the sub-procedure, the manager's + * procedure member notifies the procedure coordinator to abort all others. + *

+ * On startup, requires {@link #start()} to be called. + *

+ * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be called + */ +@InterfaceAudience.Private +public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager { + + private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class); + + /** Conf key for number of request threads to start backup on region servers */ + public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads"; + /** # of threads for backup work on the rs. */ + public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10; + + public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout"; + public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; + + /** Conf key for millis between checks to see if backup work completed or if there are errors */ + public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency"; + /** Default amount of time to check for errors while regions finish backup work */ + private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500; + + private RegionServerServices rss; + private ProcedureMemberRpcs memberRpcs; + private ProcedureMember member; + private boolean started = false; + + /** + * Create a default backup procedure manager + */ + public LogRollRegionServerProcedureManager() { + } + + /** + * Start accepting backup procedure requests. + */ + @Override + public void start() { + if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { + LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + this.memberRpcs.start(rss.getServerName().toString(), member); + started = true; + LOG.info("Started region server backup manager."); + } + + /** + * Close this and all running backup procedure tasks + * @param force forcefully stop all running tasks + * @throws IOException exception + */ + @Override + public void stop(boolean force) throws IOException { + if (!started) { + return; + } + String mode = force ? "abruptly" : "gracefully"; + LOG.info("Stopping RegionServerBackupManager " + mode + "."); + + try { + this.member.close(); + } finally { + this.memberRpcs.close(); + } + } + + /** + * If in a running state, creates the specified subprocedure for handling a backup procedure. + * @return Subprocedure to submit to the ProcedureMemeber. + */ + public Subprocedure buildSubprocedure(byte[] data) { + + // don't run a backup if the parent is stop(ping) + if (rss.isStopping() || rss.isStopped()) { + throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName() + + ", because stopping/stopped!"); + } + + LOG.info("Attempting to run a roll log procedure for backup."); + ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); + Configuration conf = rss.getConfiguration(); + long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + long wakeMillis = + conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT); + + LogRollBackupSubprocedurePool taskManager = + new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf); + return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis, + taskManager, data); + + } + + /** + * Build the actual backup procedure runner that will do all the 'hard' work + */ + public class BackupSubprocedureBuilder implements SubprocedureFactory { + + @Override + public Subprocedure buildSubprocedure(String name, byte[] data) { + return LogRollRegionServerProcedureManager.this.buildSubprocedure(data); + } + } + + @Override + public void initialize(RegionServerServices rss) throws KeeperException { + this.rss = rss; + if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { + LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory. + getCoordinatedStateManager(rss.getConfiguration()); + coordManager.initialize(rss); + this.memberRpcs = + coordManager + .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); + + // read in the backup handler configuration properties + Configuration conf = rss.getConfiguration(); + long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); + // create the actual cohort member + ThreadPoolExecutor pool = + ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); + this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); + } + + @Override + public String getProcedureSignature() { + return "backup-proc"; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java new file mode 100644 index 0000000..0da6fc4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.util; + +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Backup set is a named group of HBase tables, which are managed together by Backup/Restore + * framework. Instead of using list of tables in backup or restore operation, one can use set's name + * instead. + */ +@InterfaceAudience.Private +public class BackupSet { + private final String name; + private final List tables; + + public BackupSet(String name, List tables) { + this.name = name; + this.tables = tables; + } + + public String getName() { + return name; + } + + public List getTables() { + return tables; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(name).append("={"); + sb.append(StringUtils.join(tables, ',')); + sb.append("}"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java new file mode 100644 index 0000000..e32853d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -0,0 +1,702 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; + +/** + * A collection for methods used by multiple classes to backup HBase tables. + */ +@InterfaceAudience.Private +public final class BackupUtils { + protected static final Log LOG = LogFactory.getLog(BackupUtils.class); + public static final String LOGNAME_SEPARATOR = "."; + + private BackupUtils() { + throw new AssertionError("Instantiating utility class..."); + } + + /** + * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp + * value for the RS among the tables. + * @param rsLogTimestampMap timestamp map + * @return the min timestamp of each RS + */ + public static HashMap getRSLogTimestampMins( + HashMap> rsLogTimestampMap) { + + if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) { + return null; + } + + HashMap rsLogTimestampMins = new HashMap(); + HashMap> rsLogTimestampMapByRS = + new HashMap>(); + + for (Entry> tableEntry : rsLogTimestampMap.entrySet()) { + TableName table = tableEntry.getKey(); + HashMap rsLogTimestamp = tableEntry.getValue(); + for (Entry rsEntry : rsLogTimestamp.entrySet()) { + String rs = rsEntry.getKey(); + Long ts = rsEntry.getValue(); + if (!rsLogTimestampMapByRS.containsKey(rs)) { + rsLogTimestampMapByRS.put(rs, new HashMap()); + rsLogTimestampMapByRS.get(rs).put(table, ts); + } else { + rsLogTimestampMapByRS.get(rs).put(table, ts); + } + } + } + + for (Entry> entry : rsLogTimestampMapByRS.entrySet()) { + String rs = entry.getKey(); + rsLogTimestampMins.put(rs, BackupUtils.getMinValue(entry.getValue())); + } + + return rsLogTimestampMins; + } + + /** + * copy out Table RegionInfo into incremental backup image need to consider move this + * logic into HBackupFileSystem + * @param conn connection + * @param backupInfo backup info + * @param conf configuration + * @throws IOException exception + * @throws InterruptedException exception + */ + public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, + Configuration conf) throws IOException, InterruptedException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + // for each table in the table set, copy out the table info and region + // info files in the correct directory structure + for (TableName table : backupInfo.getTables()) { + + if (!MetaTableAccessor.tableExists(conn, table)) { + LOG.warn("Table " + table + " does not exists, skipping it."); + continue; + } + HTableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table); + + // write a copy of descriptor to the target directory + Path target = new Path(backupInfo.getTableBackupDir(table)); + FileSystem targetFs = target.getFileSystem(conf); + FSTableDescriptors descriptors = + new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf)); + descriptors.createTableDescriptorForTableDirectory(target, orig, false); + LOG.debug("Attempting to copy table info for:" + table + " target: " + target + + " descriptor: " + orig); + LOG.debug("Finished copying tableinfo."); + List regions = null; + regions = MetaTableAccessor.getTableRegions(conn, table); + // For each region, write the region info to disk + LOG.debug("Starting to write region info for table " + table); + for (HRegionInfo regionInfo : regions) { + Path regionDir = + HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), + regionInfo); + regionDir = + new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); + writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); + } + LOG.debug("Finished writing region info for table " + table); + } + } + + /** + * Write the .regioninfo file on-disk. + */ + public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs, + final Path regionInfoDir, HRegionInfo regionInfo) throws IOException { + final byte[] content = regionInfo.toDelimitedByteArray(); + Path regionInfoFile = new Path(regionInfoDir, "." + HConstants.REGIONINFO_QUALIFIER_STR); + // First check to get the permissions + FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); + // Write the RegionInfo file content + FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); + try { + out.write(content); + } finally { + out.close(); + } + } + + /** + * Parses hostname:port from WAL file path + * @param p path to WAL file + * @return hostname:port + */ + public static String parseHostNameFromLogFile(Path p) { + try { + if (AbstractFSWALProvider.isArchivedLogFile(p)) { + return BackupUtils.parseHostFromOldLog(p); + } else { + ServerName sname = AbstractFSWALProvider.getServerNameFromWALDirectoryName(p); + if (sname != null) { + return sname.getAddress().toString(); + } else { + LOG.error("Skip log file (can't parse): " + p); + return null; + } + } + } catch (Exception e) { + LOG.error("Skip log file (can't parse): " + p, e); + return null; + } + } + + /** + * Returns WAL file name + * @param walFileName WAL file name + * @return WAL file name + * @throws IOException exception + * @throws IllegalArgumentException exception + */ + public static String getUniqueWALFileNamePart(String walFileName) throws IOException { + return getUniqueWALFileNamePart(new Path(walFileName)); + } + + /** + * Returns WAL file name + * @param p WAL file path + * @return WAL file name + * @throws IOException exception + */ + public static String getUniqueWALFileNamePart(Path p) throws IOException { + return p.getName(); + } + + /** + * Get the total length of files under the given directory recursively. + * @param fs The hadoop file system + * @param dir The target directory + * @return the total length of files + * @throws IOException exception + */ + public static long getFilesLength(FileSystem fs, Path dir) throws IOException { + long totalLength = 0; + FileStatus[] files = FSUtils.listStatus(fs, dir); + if (files != null) { + for (FileStatus fileStatus : files) { + if (fileStatus.isDirectory()) { + totalLength += getFilesLength(fs, fileStatus.getPath()); + } else { + totalLength += fileStatus.getLen(); + } + } + } + return totalLength; + } + + /** + * Get list of all old WAL files (WALs and archive) + * @param c configuration + * @param hostTimestampMap {host,timestamp} map + * @return list of WAL files + * @throws IOException exception + */ + public static List getWALFilesOlderThan(final Configuration c, + final HashMap hostTimestampMap) throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List logFiles = new ArrayList(); + + PathFilter filter = new PathFilter() { + + @Override + public boolean accept(Path p) { + try { + if (AbstractFSWALProvider.isMetaFile(p)) { + return false; + } + String host = parseHostNameFromLogFile(p); + if (host == null) { + return false; + } + Long oldTimestamp = hostTimestampMap.get(host); + Long currentLogTS = BackupUtils.getCreationTime(p); + return currentLogTS <= oldTimestamp; + } catch (Exception e) { + LOG.warn("Can not parse" + p, e); + return false; + } + } + }; + FileSystem fs = FileSystem.get(c); + logFiles = BackupUtils.getFiles(fs, logDir, logFiles, filter); + logFiles = BackupUtils.getFiles(fs, oldLogDir, logFiles, filter); + return logFiles; + } + + public static TableName[] parseTableNames(String tables) { + if (tables == null) { + return null; + } + String[] tableArray = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + + TableName[] ret = new TableName[tableArray.length]; + for (int i = 0; i < tableArray.length; i++) { + ret[i] = TableName.valueOf(tableArray[i]); + } + return ret; + } + + + /** + * Check whether the backup path exist + * @param backupStr backup + * @param conf configuration + * @return Yes if path exists + * @throws IOException exception + */ + public static boolean checkPathExist(String backupStr, Configuration conf) throws IOException { + boolean isExist = false; + Path backupPath = new Path(backupStr); + FileSystem fileSys = backupPath.getFileSystem(conf); + String targetFsScheme = fileSys.getUri().getScheme(); + if (LOG.isTraceEnabled()) { + LOG.trace("Schema of given url: " + backupStr + " is: " + targetFsScheme); + } + if (fileSys.exists(backupPath)) { + isExist = true; + } + return isExist; + } + + /** + * Check target path first, confirm it doesn't exist before backup + * @param backupRootPath backup destination path + * @param conf configuration + * @throws IOException exception + */ + public static void checkTargetDir(String backupRootPath, Configuration conf) throws IOException { + boolean targetExists = false; + try { + targetExists = checkPathExist(backupRootPath, conf); + } catch (IOException e) { + String expMsg = e.getMessage(); + String newMsg = null; + if (expMsg.contains("No FileSystem for scheme")) { + newMsg = + "Unsupported filesystem scheme found in the backup target url. Error Message: " + + newMsg; + LOG.error(newMsg); + throw new IOException(newMsg); + } else { + throw e; + } + } + + if (targetExists) { + LOG.info("Using existing backup root dir: " + backupRootPath); + } else { + LOG.info("Backup root dir " + backupRootPath + " does not exist. Will be created."); + } + } + + /** + * Get the min value for all the Values a map. + * @param map map + * @return the min value + */ + public static Long getMinValue(HashMap map) { + Long minTimestamp = null; + if (map != null) { + ArrayList timestampList = new ArrayList(map.values()); + Collections.sort(timestampList); + // The min among all the RS log timestamps will be kept in backup system table table. + minTimestamp = timestampList.get(0); + } + return minTimestamp; + } + + /** + * Parses host name:port from archived WAL path + * @param p path + * @return host name + * @throws IOException exception + */ + public static String parseHostFromOldLog(Path p) { + try { + String n = p.getName(); + int idx = n.lastIndexOf(LOGNAME_SEPARATOR); + String s = URLDecoder.decode(n.substring(0, idx), "UTF8"); + return ServerName.parseHostname(s) + ":" + ServerName.parsePort(s); + } catch (Exception e) { + LOG.warn("Skip log file (can't parse): " + p); + return null; + } + } + + /** + * Given the log file, parse the timestamp from the file name. The timestamp is the last number. + * @param p a path to the log file + * @return the timestamp + * @throws IOException exception + */ + public static Long getCreationTime(Path p) throws IOException { + int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR); + if (idx < 0) { + throw new IOException("Cannot parse timestamp from path " + p); + } + String ts = p.getName().substring(idx + 1); + return Long.parseLong(ts); + } + + public static List getFiles(FileSystem fs, Path rootDir, List files, + PathFilter filter) throws FileNotFoundException, IOException { + RemoteIterator it = fs.listFiles(rootDir, true); + + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.isDirectory()) { + continue; + } + // apply filter + if (filter.accept(lfs.getPath())) { + files.add(lfs.getPath().toString()); + } + } + return files; + } + + public static void cleanupBackupData(BackupInfo context, Configuration conf) throws IOException { + cleanupHLogDir(context, conf); + cleanupTargetDir(context, conf); + } + + /** + * Clean up directories which are generated when DistCp copying hlogs + * @param backupInfo backup info + * @param conf configuration + * @throws IOException exception + */ + private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) + throws IOException { + + String logDir = backupInfo.getHLogTargetDir(); + if (logDir == null) { + LOG.warn("No log directory specified for " + backupInfo.getBackupId()); + return; + } + + Path rootPath = new Path(logDir).getParent(); + FileSystem fs = FileSystem.get(rootPath.toUri(), conf); + FileStatus[] files = listStatus(fs, rootPath, null); + if (files == null) { + return; + } + for (FileStatus file : files) { + LOG.debug("Delete log files: " + file.getPath().getName()); + fs.delete(file.getPath(), true); + } + } + + + private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { + try { + // clean up the data at target directory + LOG.debug("Trying to cleanup up target dir : " + backupInfo.getBackupId()); + String targetDir = backupInfo.getBackupRootDir(); + if (targetDir == null) { + LOG.warn("No target directory specified for " + backupInfo.getBackupId()); + return; + } + + FileSystem outputFs = FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf); + + for (TableName table : backupInfo.getTables()) { + Path targetDirPath = + new Path(getTableBackupDir(backupInfo.getBackupRootDir(), backupInfo.getBackupId(), + table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done."); + } else { + LOG.info("No data has been found in " + targetDirPath.toString() + "."); + } + + Path tableDir = targetDirPath.getParent(); + FileStatus[] backups = listStatus(outputFs, tableDir, null); + if (backups == null || backups.length == 0) { + outputFs.delete(tableDir, true); + LOG.debug(tableDir.toString() + " is empty, remove it."); + } + } + outputFs.delete(new Path(targetDir, backupInfo.getBackupId()), true); + } catch (IOException e1) { + LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " at " + + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + "."); + } + } + + /** + * Given the backup root dir, backup id and the table name, return the backup image location, + * which is also where the backup manifest file is. return value look like: + * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/backup_1396650096738/default/t1_dn/" + * @param backupRootDir backup root directory + * @param backupId backup id + * @param tableName table name + * @return backupPath String for the particular table + */ + public static String getTableBackupDir(String backupRootDir, String backupId, + TableName tableName) { + return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString() + + Path.SEPARATOR; + } + + /** + * Sort history list by start time in descending order. + * @param historyList history list + * @return sorted list of BackupCompleteData + */ + public static ArrayList sortHistoryListDesc(ArrayList historyList) { + ArrayList list = new ArrayList(); + TreeMap map = new TreeMap(); + for (BackupInfo h : historyList) { + map.put(Long.toString(h.getStartTs()), h); + } + Iterator i = map.descendingKeySet().iterator(); + while (i.hasNext()) { + list.add(map.get(i.next())); + } + return list; + } + + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates + * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and + * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. + * @param fs file system + * @param dir directory + * @param filter path filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus[] + listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException { + FileStatus[] status = null; + try { + status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + if (status == null || status.length < 1) return null; + return status; + } + + /** + * Return the 'path' component of a Path. In Hadoop, Path is an URI. This method returns the + * 'path' component of a Path's URI: e.g. If a Path is + * hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir, this method returns + * /hbase_trunk/TestTable/compaction.dir. This method is useful if you want to print + * out a Path without qualifying Filesystem instance. + * @param p file system Path whose 'path' component we are to return. + * @return Path portion of the Filesystem + */ + public static String getPath(Path p) { + return p.toUri().getPath(); + } + + /** + * Given the backup root dir and the backup id, return the log file location for an incremental + * backup. + * @param backupRootDir backup root directory + * @param backupId backup id + * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738" + */ + public static String getLogBackupDir(String backupRootDir, String backupId) { + return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + + HConstants.HREGION_LOGDIR_NAME; + } + + private static List getHistory(Configuration conf, Path backupRootPath) + throws IOException { + // Get all (n) history from backup root destination + FileSystem fs = FileSystem.get(conf); + RemoteIterator it = fs.listLocatedStatus(backupRootPath); + + List infos = new ArrayList(); + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (!lfs.isDirectory()) continue; + String backupId = lfs.getPath().getName(); + try { + BackupInfo info = loadBackupInfo(backupRootPath, backupId, fs); + infos.add(info); + } catch (IOException e) { + LOG.error("Can not load backup info from: " + lfs.getPath(), e); + } + } + // Sort + Collections.sort(infos, new Comparator() { + + @Override + public int compare(BackupInfo o1, BackupInfo o2) { + long ts1 = getTimestamp(o1.getBackupId()); + long ts2 = getTimestamp(o2.getBackupId()); + if (ts1 == ts2) return 0; + return ts1 < ts2 ? 1 : -1; + } + + private long getTimestamp(String backupId) { + String[] split = backupId.split("_"); + return Long.parseLong(split[1]); + } + }); + return infos; + } + + public static List getHistory(Configuration conf, int n, Path backupRootPath, + BackupInfo.Filter... filters) throws IOException { + List infos = getHistory(conf, backupRootPath); + List ret = new ArrayList(); + for (BackupInfo info : infos) { + if (ret.size() == n) { + break; + } + boolean passed = true; + for (int i = 0; i < filters.length; i++) { + if (!filters[i].apply(info)) { + passed = false; + break; + } + } + if (passed) { + ret.add(info); + } + } + return ret; + } + + public static BackupInfo loadBackupInfo(Path backupRootPath, String backupId, FileSystem fs) + throws IOException { + Path backupPath = new Path(backupRootPath, backupId); + + RemoteIterator it = fs.listFiles(backupPath, true); + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.getPath().getName().equals(BackupManifest.MANIFEST_FILE_NAME)) { + // Load BackupManifest + BackupManifest manifest = new BackupManifest(fs, lfs.getPath().getParent()); + BackupInfo info = manifest.toBackupInfo(); + return info; + } + } + return null; + } + + /** + * Create restore request. + * @param backupRootDir backup root dir + * @param backupId backup id + * @param check check only + * @param fromTables table list from + * @param toTables table list to + * @param isOverwrite overwrite data + * @return request obkect + */ + public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, + boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { + RestoreRequest.Builder builder = new RestoreRequest.Builder(); + RestoreRequest request = builder.withBackupRootDir(backupRootDir) + .withBackupId(backupId) + .withCheck(check) + .withFromTables(fromTables) + .withToTables(toTables) + .withOvewrite(isOverwrite).build(); + return request; + } + + public static boolean validate(HashMap backupManifestMap, + Configuration conf) throws IOException { + boolean isValid = true; + + for (Entry manifestEntry : backupManifestMap.entrySet()) { + TableName table = manifestEntry.getKey(); + TreeSet imageSet = new TreeSet(); + + ArrayList depList = manifestEntry.getValue().getDependentListByTable(table); + if (depList != null && !depList.isEmpty()) { + imageSet.addAll(depList); + } + + LOG.info("Dependent image(s) from old to new:"); + for (BackupImage image : imageSet) { + String imageDir = + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table); + if (!BackupUtils.checkPathExist(imageDir, conf)) { + LOG.error("ERROR: backup image does not exist: " + imageDir); + isValid = false; + break; + } + LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available"); + } + } + return isValid; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java new file mode 100644 index 0000000..a130c21 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -0,0 +1,610 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; + +/** + * A collection for methods used by multiple classes to restore HBase tables. + */ +@InterfaceAudience.Private +public class RestoreTool { + + public static final Log LOG = LogFactory.getLog(BackupUtils.class); + + private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR }; + + private final static long TABLE_AVAILABILITY_WAIT_TIME = 180000; + + protected Configuration conf = null; + + protected Path backupRootPath; + + protected String backupId; + + protected FileSystem fs; + private final Path restoreTmpPath; + + // store table name and snapshot dir mapping + private final HashMap snapshotMap = new HashMap<>(); + + public RestoreTool(Configuration conf, final Path backupRootPath, final String backupId) + throws IOException { + this.conf = conf; + this.backupRootPath = backupRootPath; + this.backupId = backupId; + this.fs = backupRootPath.getFileSystem(conf); + this.restoreTmpPath = + new Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore"); + } + + /** + * return value represent path for: + * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn" + * @param tabelName table name + * @return path to table archive + * @throws IOException exception + */ + Path getTableArchivePath(TableName tableName) throws IOException { + + Path baseDir = + new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId), + HConstants.HFILE_ARCHIVE_DIRECTORY); + Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR); + Path archivePath = new Path(dataDir, tableName.getNamespaceAsString()); + Path tableArchivePath = new Path(archivePath, tableName.getQualifierAsString()); + if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) { + LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists"); + tableArchivePath = null; // empty table has no archive + } + return tableArchivePath; + } + + /** + * Gets region list + * @param tableName table name + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + ArrayList getRegionList(TableName tableName) throws FileNotFoundException, IOException { + Path tableArchivePath = getTableArchivePath(tableName); + ArrayList regionDirList = new ArrayList(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + + + void modifyTableSync(Connection conn, HTableDescriptor desc) throws IOException { + + try (Admin admin = conn.getAdmin();) { + admin.modifyTable(desc.getTableName(), desc); + int attempt = 0; + int maxAttempts = 600; + while (!admin.isTableAvailable(desc.getTableName())) { + Thread.sleep(100); + attempt++; + if (attempt++ > maxAttempts) { + throw new IOException("Timeout expired " + (maxAttempts * 100) + "ms"); + } + } + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently + * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in + * the future + * @param conn HBase connection + * @param tableBackupPath backup path + * @param logDirs : incremental backup folders, which contains WAL + * @param tableNames : source tableNames(table names were backuped) + * @param newTableNames : target tableNames(table names to be restored to) + * @param incrBackupId incremental backup Id + * @throws IOException exception + */ + public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs, + TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException { + + try (Admin admin = conn.getAdmin();) { + if (tableNames.length != newTableNames.length) { + throw new IOException("Number of source tables and target tables does not match!"); + } + FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); + + // for incremental backup image, expect the table already created either by user or previous + // full backup. Here, check that all new tables exists + for (TableName tableName : newTableNames) { + if (!admin.tableExists(tableName)) { + throw new IOException("HBase table " + tableName + + " does not exist. Create the table first, e.g. by restoring a full backup."); + } + } + // adjust table schema + for (int i = 0; i < tableNames.length; i++) { + TableName tableName = tableNames[i]; + HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, incrBackupId); + LOG.debug("Found descriptor " + tableDescriptor + " through " + incrBackupId); + + TableName newTableName = newTableNames[i]; + HTableDescriptor newTableDescriptor = admin.getTableDescriptor(newTableName); + List families = Arrays.asList(tableDescriptor.getColumnFamilies()); + List existingFamilies = + Arrays.asList(newTableDescriptor.getColumnFamilies()); + boolean schemaChangeNeeded = false; + for (HColumnDescriptor family : families) { + if (!existingFamilies.contains(family)) { + newTableDescriptor.addFamily(family); + schemaChangeNeeded = true; + } + } + for (HColumnDescriptor family : existingFamilies) { + if (!families.contains(family)) { + newTableDescriptor.removeFamily(family.getName()); + schemaChangeNeeded = true; + } + } + if (schemaChangeNeeded) { + modifyTableSync(conn, newTableDescriptor); + LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor); + } + } + RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); + + restoreService.run(logDirs, tableNames, newTableNames, false); + } + } + + public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName, + TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) + throws IOException { + restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, truncateIfExists, + lastIncrBackupId); + } + + /** + * Returns value represent path for path to backup table snapshot directory: + * "/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot" + * @param backupRootPath backup root path + * @param tableName table name + * @param backupId backup Id + * @return path for snapshot + */ + Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backupId) { + return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId), + HConstants.SNAPSHOT_DIR_NAME); + } + + /** + * Returns value represent path for: + * ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/snapshot_1396650097621_namespace_table" + * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path contains .snapshotinfo, + * .data.manifest (trunk) + * @param tableName table name + * @return path to table info + * @throws FileNotFoundException exception + * @throws IOException exception + */ + Path getTableInfoPath(TableName tableName) throws FileNotFoundException, IOException { + Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + Path tableInfoPath = null; + + // can't build the path directly as the timestamp values are different + FileStatus[] snapshots = fs.listStatus(tableSnapShotPath); + for (FileStatus snapshot : snapshots) { + tableInfoPath = snapshot.getPath(); + // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest"; + if (tableInfoPath.getName().endsWith("data.manifest")) { + break; + } + } + return tableInfoPath; + } + + /** + * Get table descriptor + * @param tableName is the table backed up + * @return {@link HTableDescriptor} saved in backup image of the table + */ + HTableDescriptor getTableDesc(TableName tableName) throws FileNotFoundException, IOException { + Path tableInfoPath = this.getTableInfoPath(tableName); + SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc); + HTableDescriptor tableDescriptor = manifest.getTableDescriptor(); + if (!tableDescriptor.getTableName().equals(tableName)) { + LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: " + + tableInfoPath.toString()); + LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString()); + throw new FileNotFoundException("couldn't find Table Desc for table: " + tableName + + " under tableInfoPath: " + tableInfoPath.toString()); + } + return tableDescriptor; + } + + /** + * Duplicate the backup image if it's on local cluster + * @see HStore#bulkLoadHFile(String, long) + * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum) + * @param tableArchivePath archive path + * @return the new tableArchivePath + * @throws IOException exception + */ + Path checkLocalAndBackup(Path tableArchivePath) throws IOException { + // Move the file if it's on local cluster + boolean isCopyNeeded = false; + + FileSystem srcFs = tableArchivePath.getFileSystem(conf); + FileSystem desFs = FileSystem.get(conf); + if (tableArchivePath.getName().startsWith("/")) { + isCopyNeeded = true; + } else { + // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path, + // long) + if (srcFs.getUri().equals(desFs.getUri())) { + LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: " + + desFs.getUri()); + isCopyNeeded = true; + } + } + if (isCopyNeeded) { + LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore"); + if (desFs.exists(restoreTmpPath)) { + try { + desFs.delete(restoreTmpPath, true); + } catch (IOException e) { + LOG.debug("Failed to delete path: " + restoreTmpPath + + ", need to check whether restore target DFS cluster is healthy"); + } + } + FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf); + LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath); + tableArchivePath = restoreTmpPath; + } + return tableArchivePath; + } + + private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName, + String lastIncrBackupId) throws IOException { + if (lastIncrBackupId != null) { + String target = + BackupUtils.getTableBackupDir(backupRootPath.toString(), + lastIncrBackupId, tableName); + return FSTableDescriptors.getTableDescriptorFromFs(fileSys, new Path(target)); + } + return null; + } + + private void restoreTableAndCreate(Connection conn, TableName tableName, TableName newTableName, + Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException { + if (newTableName == null) { + newTableName = tableName; + } + FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); + + // get table descriptor first + HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId); + if (tableDescriptor != null) { + LOG.debug("Retrieved descriptor: " + tableDescriptor + " thru " + lastIncrBackupId); + } + + if (tableDescriptor == null) { + Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + if (fileSys.exists(tableSnapshotPath)) { + // snapshot path exist means the backup path is in HDFS + // check whether snapshot dir already recorded for target table + if (snapshotMap.get(tableName) != null) { + SnapshotDescription desc = + SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); + SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); + tableDescriptor = manifest.getTableDescriptor(); + } else { + tableDescriptor = getTableDesc(tableName); + snapshotMap.put(tableName, getTableInfoPath(tableName)); + } + if (tableDescriptor == null) { + LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); + } + } else { + throw new IOException("Table snapshot directory: " + + tableSnapshotPath + " does not exist."); + } + } + + Path tableArchivePath = getTableArchivePath(tableName); + if (tableArchivePath == null) { + if (tableDescriptor != null) { + // find table descriptor but no archive dir means the table is empty, create table and exit + if (LOG.isDebugEnabled()) { + LOG.debug("find table descriptor but no archive dir for table " + tableName + + ", will only create table"); + } + tableDescriptor.setName(newTableName); + checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, null, tableDescriptor, + truncateIfExists); + return; + } else { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + " tableArchivePath is null."); + } + } + + if (tableDescriptor == null) { + tableDescriptor = new HTableDescriptor(newTableName); + } else { + tableDescriptor.setName(newTableName); + } + + // record all region dirs: + // load all files in dir + try { + ArrayList regionPathList = getRegionList(tableName); + + // should only try to create the table with all region informations, so we could pre-split + // the regions in fine grain + checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList, + tableDescriptor, truncateIfExists); + if (tableArchivePath != null) { + // start real restore through bulkload + // if the backup target is on local cluster, special action needed + Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); + if (tempTableArchivePath.equals(tableArchivePath)) { + if (LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); + } + } else { + regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir + if (LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + } + } + + LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); + for (Path regionPath : regionPathList) { + String regionName = regionPath.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + regionName); + } + String[] args = { regionName, newTableName.getNameAsString() }; + loader.run(args); + } + } + // we do not recovered edits + } catch (Exception e) { + throw new IllegalStateException("Cannot restore hbase table", e); + } + } + + /** + * Gets region list + * @param tableArchivePath table archive path + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + ArrayList getRegionList(Path tableArchivePath) throws FileNotFoundException, IOException { + ArrayList regionDirList = new ArrayList(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + + /** + * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full + * backup. + * @return the {@link LoadIncrementalHFiles} instance + * @throws IOException exception + */ + private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables) + throws IOException { + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(this.conf); + } catch (Exception e1) { + throw new IOException(e1); + } + return loader; + } + + /** + * Calculate region boundaries and add all the column families to the table descriptor + * @param regionDirList region dir list + * @return a set of keys to store the boundaries + */ + byte[][] generateBoundaryKeys(ArrayList regionDirList) throws FileNotFoundException, + IOException { + TreeMap map = new TreeMap(Bytes.BYTES_COMPARATOR); + // Build a set of keys to store the boundaries + // calculate region boundaries and add all the column families to the table descriptor + for (Path regionDir : regionDirList) { + LOG.debug("Parsing region dir: " + regionDir); + Path hfofDir = regionDir; + + if (!fs.exists(hfofDir)) { + LOG.warn("HFileOutputFormat dir " + hfofDir + " not found"); + } + + FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); + if (familyDirStatuses == null) { + throw new IOException("No families found in " + hfofDir); + } + + for (FileStatus stat : familyDirStatuses) { + if (!stat.isDirectory()) { + LOG.warn("Skipping non-directory " + stat.getPath()); + continue; + } + boolean isIgnore = false; + String pathName = stat.getPath().getName(); + for (String ignore : ignoreDirs) { + if (pathName.contains(ignore)) { + LOG.warn("Skipping non-family directory" + pathName); + isIgnore = true; + break; + } + } + if (isIgnore) { + continue; + } + Path familyDir = stat.getPath(); + LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]"); + // Skip _logs, etc + if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) { + continue; + } + + // start to parse hfile inside one family dir + Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); + for (Path hfile : hfiles) { + if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".") + || StoreFileInfo.isReference(hfile.getName()) + || HFileLink.isHFileLink(hfile.getName())) { + continue; + } + HFile.Reader reader = HFile.createReader(fs, hfile, conf); + final byte[] first, last; + try { + reader.loadFileInfo(); + first = reader.getFirstRowKey(); + last = reader.getLastRowKey(); + LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first=" + + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); + + // To eventually infer start key-end key boundaries + Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0; + map.put(first, value + 1); + value = map.containsKey(last) ? (Integer) map.get(last) : 0; + map.put(last, value - 1); + } finally { + reader.close(); + } + } + } + } + return LoadIncrementalHFiles.inferBoundaries(map); + } + + /** + * Prepare the table for bulkload, most codes copied from + * {@link LoadIncrementalHFiles#createTable(String, String)} + * @param conn connection + * @param tableBackupPath path + * @param tableName table name + * @param targetTableName target table name + * @param regionDirList region directory list + * @param htd table descriptor + * @param truncateIfExists truncates table if exists + * @throws IOException exception + */ + private void checkAndCreateTable(Connection conn, Path tableBackupPath, TableName tableName, + TableName targetTableName, ArrayList regionDirList, HTableDescriptor htd, + boolean truncateIfExists) throws IOException { + try (Admin admin = conn.getAdmin();) { + boolean createNew = false; + if (admin.tableExists(targetTableName)) { + if (truncateIfExists) { + LOG.info("Truncating exising target table '" + targetTableName + + "', preserving region splits"); + admin.disableTable(targetTableName); + admin.truncateTable(targetTableName, true); + } else { + LOG.info("Using exising target table '" + targetTableName + "'"); + } + } else { + createNew = true; + } + if (createNew) { + LOG.info("Creating target table '" + targetTableName + "'"); + byte[][] keys = null; + if (regionDirList == null || regionDirList.size() == 0) { + admin.createTable(htd, null); + } else { + keys = generateBoundaryKeys(regionDirList); + // create table using table descriptor and region boundaries + admin.createTable(htd, keys); + } + long startTime = EnvironmentEdgeManager.currentTime(); + while (!admin.isTableAvailable(targetTableName, keys)) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) { + throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table " + + targetTableName + " is still not available"); + } + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index ae36f08..0e3755b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hbase.coordination; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.io.IOException; + import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.zookeeper.KeeperException; /** * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations. @@ -51,8 +56,21 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan * Method to retrieve coordination for split log worker */ public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination(); + /** * Method to retrieve coordination for split log manager */ public abstract SplitLogManagerCoordination getSplitLogManagerCoordination(); + /** + * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs} + */ + public abstract ProcedureCoordinatorRpcs + getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException; + + /** + * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs} + */ + public abstract ProcedureMemberRpcs + getProcedureMemberRpcs(String procType) throws KeeperException; + } http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 3e89be7..8ce5f9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -17,10 +17,17 @@ */ package org.apache.hadoop.hbase.coordination; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.io.IOException; + import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; +import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; /** * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}. @@ -49,9 +56,21 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { @Override public SplitLogWorkerCoordination getSplitLogWorkerCoordination() { return splitLogWorkerCoordination; - } + } + @Override public SplitLogManagerCoordination getSplitLogManagerCoordination() { return splitLogManagerCoordination; } + + @Override + public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode) + throws IOException { + return new ZKProcedureCoordinator(watcher, procType, coordNode); + } + + @Override + public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException { + return new ZKProcedureMemberRpcs(watcher, procType); + } }