From commits-return-66821-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Fri Feb 2 09:53:12 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id DE15618066D for ; Fri, 2 Feb 2018 09:53:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CE22A160C5E; Fri, 2 Feb 2018 08:53:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2BD06160C5A for ; Fri, 2 Feb 2018 09:53:11 +0100 (CET) Received: (qmail 41197 invoked by uid 500); 2 Feb 2018 08:53:09 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 40983 invoked by uid 99); 2 Feb 2018 08:53:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Feb 2018 08:53:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DDC89F29D9; Fri, 2 Feb 2018 08:53:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Fri, 02 Feb 2018 08:53:17 -0000 Message-Id: <0bf033c032a94253b1b0818f327e347f@git.apache.org> In-Reply-To: <84b4768edce14438afb3213e6c38bd13@git.apache.org> References: <84b4768edce14438afb3213e6c38bd13@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/41] hbase git commit: HBASE-19528 - Major Compaction Tool HBASE-19528 - Major Compaction Tool Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4b3b627a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4b3b627a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4b3b627a Branch: refs/heads/HBASE-19064 Commit: 4b3b627abe34f8426fddd914e941d12f79da0752 Parents: 7c318ce Author: Rahul Gidwani Authored: Thu Jan 25 12:47:50 2018 -0800 Committer: Rahul Gidwani Committed: Wed Jan 31 10:18:03 2018 -0800 ---------------------------------------------------------------------- .../compaction/ClusterCompactionQueues.java | 137 +++++++ .../util/compaction/MajorCompactionRequest.java | 171 +++++++++ .../hbase/util/compaction/MajorCompactor.java | 379 +++++++++++++++++++ .../compaction/MajorCompactionRequestTest.java | 166 ++++++++ .../util/compaction/MajorCompactorTest.java | 81 ++++ 5 files changed, 934 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java new file mode 100644 index 0000000..c0d34d9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@InterfaceAudience.Private +class ClusterCompactionQueues { + + private final Map> compactionQueues; + private final Set compactingServers; + private final ReadWriteLock lock; + private final int concurrentServers; + + ClusterCompactionQueues(int concurrentServers) { + this.concurrentServers = concurrentServers; + + this.compactionQueues = Maps.newHashMap(); + this.lock = new ReentrantReadWriteLock(); + this.compactingServers = Sets.newHashSet(); + } + + void addToCompactionQueue(ServerName serverName, MajorCompactionRequest info) { + this.lock.writeLock().lock(); + try { + List result = this.compactionQueues.get(serverName); + if (result == null) { + result = Lists.newArrayList(); + compactionQueues.put(serverName, result); + } + result.add(info); + } finally { + this.lock.writeLock().unlock(); + } + } + + boolean hasWorkItems() { + lock.readLock().lock(); + try { + return !this.compactionQueues.values().stream().allMatch(List::isEmpty); + } finally { + lock.readLock().unlock(); + } + } + + int getCompactionRequestsLeftToFinish() { + lock.readLock().lock(); + try { + int size = 0; + for (List queue : compactionQueues.values()) { + size += queue.size(); + } + return size; + } finally { + lock.readLock().unlock(); + } + } + + @VisibleForTesting List getQueue(ServerName serverName) { + lock.readLock().lock(); + try { + return compactionQueues.get(serverName); + } finally { + lock.readLock().unlock(); + } + } + + MajorCompactionRequest reserveForCompaction(ServerName serverName) { + lock.writeLock().lock(); + try { + if (!compactionQueues.get(serverName).isEmpty()) { + compactingServers.add(serverName); + return compactionQueues.get(serverName).remove(0); + } + return null; + } finally { + lock.writeLock().unlock(); + } + } + + void releaseCompaction(ServerName serverName) { + lock.writeLock().lock(); + try { + compactingServers.remove(serverName); + } finally { + lock.writeLock().unlock(); + } + } + + boolean atCapacity() { + lock.readLock().lock(); + try { + return compactingServers.size() >= concurrentServers; + } finally { + lock.readLock().unlock(); + } + } + + Optional getLargestQueueFromServersNotCompacting() { + lock.readLock().lock(); + try { + return compactionQueues.entrySet().stream() + .filter(entry -> !compactingServers.contains(entry.getKey())) + .max(Map.Entry.comparingByValue( + (o1, o2) -> Integer.compare(o1.size(), o2.size()))).map(Map.Entry::getKey); + } finally { + lock.readLock().unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java new file mode 100644 index 0000000..51b2b9d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@InterfaceAudience.Private +class MajorCompactionRequest { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class); + + private final Configuration configuration; + private final RegionInfo region; + private Set stores; + private final long timestamp; + + @VisibleForTesting + MajorCompactionRequest(Configuration configuration, RegionInfo region, + Set stores, long timestamp) { + this.configuration = configuration; + this.region = region; + this.stores = stores; + this.timestamp = timestamp; + } + + static Optional newRequest(Configuration configuration, RegionInfo info, + Set stores, long timestamp) throws IOException { + MajorCompactionRequest request = + new MajorCompactionRequest(configuration, info, stores, timestamp); + return request.createRequest(configuration, stores); + } + + RegionInfo getRegion() { + return region; + } + + Set getStores() { + return stores; + } + + void setStores(Set stores) { + this.stores = stores; + } + + @VisibleForTesting + Optional createRequest(Configuration configuration, + Set stores) throws IOException { + Set familiesToCompact = getStoresRequiringCompaction(stores); + MajorCompactionRequest request = null; + if (!familiesToCompact.isEmpty()) { + request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp); + } + return Optional.ofNullable(request); + } + + Set getStoresRequiringCompaction(Set requestedStores) throws IOException { + try(Connection connection = getConnection(configuration)) { + HRegionFileSystem fileSystem = getFileSystem(connection); + Set familiesToCompact = Sets.newHashSet(); + for (String family : requestedStores) { + // do we have any store files? + Collection storeFiles = fileSystem.getStoreFiles(family); + if (storeFiles == null) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName(), " has no store files"); + continue; + } + // check for reference files + if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) { + familiesToCompact.add(family); + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName()); + continue; + } + // check store file timestamps + boolean includeStore = false; + for (StoreFileInfo storeFile : storeFiles) { + if (storeFile.getModificationTime() < timestamp) { + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + + fileSystem.getRegionInfo().getEncodedName()); + familiesToCompact.add(family); + includeStore = true; + break; + } + } + if (!includeStore) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName(), " already compacted"); + } + } + return familiesToCompact; + } + } + + @VisibleForTesting + Connection getConnection(Configuration configuration) throws IOException { + return ConnectionFactory.createConnection(configuration); + } + + private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family) + throws IOException { + List referenceFiles = + getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family)); + for (Path referenceFile : referenceFiles) { + FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile); + if (status.getModificationTime() < timestamp) { + LOG.info("Including store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName() + " (reference store files)"); + return true; + } + } + return false; + + } + + @VisibleForTesting + List getReferenceFilePaths(FileSystem fileSystem, Path familyDir) + throws IOException { + return FSUtils.getReferenceFilePaths(fileSystem, familyDir); + } + + @VisibleForTesting + HRegionFileSystem getFileSystem(Connection connection) throws IOException { + Admin admin = connection.getAdmin(); + return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(), + FSUtils.getCurrentFileSystem(admin.getConfiguration()), + FSUtils.getTableDir(FSUtils.getRootDir(admin.getConfiguration()), region.getTable()), + region, true); + } + + @Override + public String toString() { + return "region: " + region.getEncodedName() + " store(s): " + stores; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java new file mode 100644 index 0000000..c3372bb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.CompactionState; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Joiner; +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class MajorCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); + private static final Set ERRORS = ConcurrentHashMap.newKeySet(); + + private final ClusterCompactionQueues clusterCompactionQueues; + private final long timestamp; + private final Set storesToCompact; + private final ExecutorService executor; + private final long sleepForMs; + private final Connection connection; + private final TableName tableName; + + public MajorCompactor(Configuration conf, TableName tableName, Set storesToCompact, + int concurrency, long timestamp, long sleepForMs) throws IOException { + this.connection = ConnectionFactory.createConnection(conf); + this.tableName = tableName; + this.timestamp = timestamp; + this.storesToCompact = storesToCompact; + this.executor = Executors.newFixedThreadPool(concurrency); + this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency); + this.sleepForMs = sleepForMs; + } + + public void compactAllRegions() throws Exception { + List> futures = Lists.newArrayList(); + while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) { + while (clusterCompactionQueues.atCapacity()) { + LOG.debug("Waiting for servers to complete Compactions"); + Thread.sleep(sleepForMs); + } + Optional serverToProcess = + clusterCompactionQueues.getLargestQueueFromServersNotCompacting(); + if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) { + ServerName serverName = serverToProcess.get(); + // check to see if the region has moved... if so we have to enqueue it again with + // the proper serverName + MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName); + + ServerName currentServer = connection.getRegionLocator(tableName) + .getRegionLocation(request.getRegion().getStartKey()).getServerName(); + + if (!currentServer.equals(serverName)) { + // add it back to the queue with the correct server it should be picked up in the future. + LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: " + + serverName + " to: " + currentServer + " re-queuing request"); + clusterCompactionQueues.addToCompactionQueue(currentServer, request); + clusterCompactionQueues.releaseCompaction(serverName); + } else { + LOG.info("Firing off compaction request for server: " + serverName + ", " + request + + " total queue size left: " + clusterCompactionQueues + .getCompactionRequestsLeftToFinish()); + futures.add(executor.submit(new Compact(serverName, request))); + } + } else { + // haven't assigned anything so we sleep. + Thread.sleep(sleepForMs); + } + } + LOG.info("All compactions have completed"); + } + + private boolean futuresComplete(List> futures) { + futures.removeIf(Future::isDone); + return futures.isEmpty(); + } + + public void shutdown() throws Exception { + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (!ERRORS.isEmpty()) { + StringBuilder builder = + new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size()) + .append(" regions / stores that failed compacting\n") + .append("Failed compaction requests\n").append("--------------------------\n") + .append(Joiner.on("\n").join(ERRORS)); + LOG.error(builder.toString()); + } + if (connection != null) { + connection.close(); + } + LOG.info("All regions major compacted successfully"); + } + + @VisibleForTesting void initializeWorkQueues() throws IOException { + if (storesToCompact.isEmpty()) { + connection.getTable(tableName).getDescriptor().getColumnFamilyNames() + .forEach(a -> storesToCompact.add(Bytes.toString(a))); + LOG.info("No family specified, will execute for all families"); + } + LOG.info( + "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); + List regionLocations = + connection.getRegionLocator(tableName).getAllRegionLocations(); + for (HRegionLocation location : regionLocations) { + Optional request = MajorCompactionRequest + .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact, + timestamp); + request.ifPresent(majorCompactionRequest -> clusterCompactionQueues + .addToCompactionQueue(location.getServerName(), majorCompactionRequest)); + } + } + + class Compact implements Runnable { + + private final ServerName serverName; + private final MajorCompactionRequest request; + + Compact(ServerName serverName, MajorCompactionRequest request) { + this.serverName = serverName; + this.request = request; + } + + @Override public void run() { + try { + compactAndWait(request); + } catch (NotServingRegionException e) { + // this region has split or merged + LOG.warn("Region is invalid, requesting updated regions", e); + // lets updated the cluster compaction queues with these newly created regions. + addNewRegions(); + } catch (Exception e) { + LOG.warn("Error compacting:", e); + } finally { + clusterCompactionQueues.releaseCompaction(serverName); + } + } + + void compactAndWait(MajorCompactionRequest request) throws Exception { + Admin admin = connection.getAdmin(); + try { + // only make the request if the region is not already major compacting + if (!isCompacting(request)) { + Set stores = request.getStoresRequiringCompaction(storesToCompact); + if (!stores.isEmpty()) { + request.setStores(stores); + for (String store : request.getStores()) { + admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), + Bytes.toBytes(store)); + } + } + } + while (isCompacting(request)) { + Thread.sleep(sleepForMs); + LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() + .getEncodedName()); + } + } finally { + // Make sure to wait for the CompactedFileDischarger chore to do its work + int waitForArchive = connection.getConfiguration() + .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); + Thread.sleep(waitForArchive); + // check if compaction completed successfully, otherwise put that request back in the + // proper queue + Set storesRequiringCompaction = + request.getStoresRequiringCompaction(storesToCompact); + if (!storesRequiringCompaction.isEmpty()) { + // this happens, when a region server is marked as dead, flushes a store file and + // the new regionserver doesn't pick it up because its accounted for in the WAL replay, + // thus you have more store files on the filesystem than the regionserver knows about. + boolean regionHasNotMoved = connection.getRegionLocator(tableName) + .getRegionLocation(request.getRegion().getStartKey()).getServerName() + .equals(serverName); + if (regionHasNotMoved) { + LOG.error("Not all store files were compacted, this may be due to the regionserver not " + + "being aware of all store files. Will not reattempt compacting, " + request); + ERRORS.add(request); + } else { + request.setStores(storesRequiringCompaction); + clusterCompactionQueues.addToCompactionQueue(serverName, request); + LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction + + " region: " + request.getRegion().getEncodedName()); + } + } else { + LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() + + " -> cf(s): " + request.getStores()); + } + } + } + } + + private boolean isCompacting(MajorCompactionRequest request) throws Exception { + CompactionState compactionState = connection.getAdmin() + .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes()); + return compactionState.equals(CompactionState.MAJOR) || compactionState + .equals(CompactionState.MAJOR_AND_MINOR); + } + + private void addNewRegions() { + try { + List locations = + connection.getRegionLocator(tableName).getAllRegionLocations(); + for (HRegionLocation location : locations) { + if (location.getRegion().getRegionId() > timestamp) { + Optional compactionRequest = MajorCompactionRequest + .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact, + timestamp); + compactionRequest.ifPresent(request -> clusterCompactionQueues + .addToCompactionQueue(location.getServerName(), request)); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) throws Exception { + Options options = new Options(); + options.addOption( + Option.builder("table") + .required() + .desc("table name") + .hasArg() + .build() + ); + options.addOption( + Option.builder("cf") + .optionalArg(true) + .desc("column families: comma separated eg: a,b,c") + .hasArg() + .build() + ); + options.addOption( + Option.builder("servers") + .required() + .desc("Concurrent servers compacting") + .hasArg() + .build() + ); + options.addOption( + Option.builder("minModTime"). + desc("Compact if store files have modification time < minModTime") + .hasArg() + .build() + ); + options.addOption( + Option.builder("zk") + .optionalArg(true) + .desc("zk quorum") + .hasArg() + .build() + ); + options.addOption( + Option.builder("rootDir") + .optionalArg(true) + .desc("hbase.rootDir") + .hasArg() + .build() + ); + options.addOption( + Option.builder("sleep") + .desc("Time to sleepForMs (ms) for checking compaction status per region and available " + + "work queues: default 30s") + .hasArg() + .build() + ); + options.addOption( + Option.builder("retries") + .desc("Max # of retries for a compaction request," + " defaults to 3") + .hasArg() + .build() + ); + options.addOption( + Option.builder("dryRun") + .desc("Dry run, will just output a list of regions that require compaction based on " + + "parameters passed") + .hasArg(false) + .build() + ); + + final CommandLineParser cmdLineParser = new DefaultParser(); + CommandLine commandLine = null; + try { + commandLine = cmdLineParser.parse(options, args); + } catch (ParseException parseException) { + System.out.println( + "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " + + parseException); + printUsage(options); + + } + String tableName = commandLine.getOptionValue("table"); + String cf = commandLine.getOptionValue("cf", null); + Set families = Sets.newHashSet(); + if (cf != null) { + Iterables.addAll(families, Splitter.on(",").split(cf)); + } + + + Configuration configuration = HBaseConfiguration.create(); + int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); + long minModTime = Long.parseLong( + commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis()))); + String quorum = + commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM)); + String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); + long sleep = Long.valueOf(commandLine.getOptionValue("sleep", Long.toString(30000))); + + configuration.set(HConstants.HBASE_DIR, rootDir); + configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); + + MajorCompactor compactor = + new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, + minModTime, sleep); + + compactor.initializeWorkQueues(); + if (!commandLine.hasOption("dryRun")) { + compactor.compactAllRegions(); + } + compactor.shutdown(); + } + + private static void printUsage(final Options options) { + String header = "\nUsage instructions\n\n"; + String footer = "\n"; + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java new file mode 100644 index 0000000..c5ce4e3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@Category({SmallTests.class}) +public class MajorCompactionRequestTest { + + private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); + private static final String FAMILY = "a"; + private Path rootRegionDir; + private Path regionStoreDir; + + @Before public void setUp() throws Exception { + rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest"); + regionStoreDir = new Path(rootRegionDir, FAMILY); + } + + @Test public void testStoresNeedingCompaction() throws Exception { + // store files older than timestamp + List storeFiles = mockStoreFiles(regionStoreDir, 5, 10); + MajorCompactionRequest request = makeMockRequest(100, storeFiles, false); + Optional result = + request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + assertTrue(result.isPresent()); + + // store files newer than timestamp + storeFiles = mockStoreFiles(regionStoreDir, 5, 101); + request = makeMockRequest(100, storeFiles, false); + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + assertFalse(result.isPresent()); + } + + @Test public void testIfWeHaveNewReferenceFilesButOldStoreFiles() throws Exception { + // this tests that reference files that are new, but have older timestamps for the files + // they reference still will get compacted. + TableName table = TableName.valueOf("MajorCompactorTest"); + TableDescriptor htd = UTILITY.createTableDescriptor(table, Bytes.toBytes(FAMILY)); + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + HRegion region = + HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getConfiguration(), htd); + + Configuration configuration = mock(Configuration.class); + // the reference file timestamp is newer + List storeFiles = mockStoreFiles(regionStoreDir, 4, 101); + List paths = storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList()); + // the files that are referenced are older, thus we still compact. + HRegionFileSystem fileSystem = + mockFileSystem(region.getRegionInfo(), true, storeFiles, 50); + MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration, + region.getRegionInfo(), Sets.newHashSet(FAMILY), 100)); + doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration)); + doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class), + any(Path.class)); + doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class)); + Set result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a")); + assertEquals(FAMILY, Iterables.getOnlyElement(result)); + } + + private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles, + List storeFiles) throws IOException { + long timestamp = storeFiles.stream().findFirst().get().getModificationTime(); + return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp); + } + + private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles, + List storeFiles, long referenceFileTimestamp) throws IOException { + FileSystem fileSystem = mock(FileSystem.class); + if (hasReferenceFiles) { + FileStatus fileStatus = mock(FileStatus.class); + doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime(); + doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class)); + } + HRegionFileSystem mockSystem = mock(HRegionFileSystem.class); + doReturn(info).when(mockSystem).getRegionInfo(); + doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY); + doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString()); + doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString()); + doReturn(fileSystem).when(mockSystem).getFileSystem(); + return mockSystem; + } + + private List mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) + throws IOException { + List infos = Lists.newArrayList(); + int i = 0; + while (i < howMany) { + StoreFileInfo storeFileInfo = mock(StoreFileInfo.class); + doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime(); + doReturn(new Path(regionStoreDir, RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo) + .getPath(); + infos.add(storeFileInfo); + i++; + } + return infos; + } + + private MajorCompactionRequest makeMockRequest(long timestamp, List storeFiles, + boolean references) throws IOException { + Configuration configuration = mock(Configuration.class); + RegionInfo regionInfo = mock(RegionInfo.class); + when(regionInfo.getEncodedName()).thenReturn("HBase"); + when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); + MajorCompactionRequest request = + new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp); + MajorCompactionRequest spy = spy(request); + HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles); + doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); + doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration)); + return spy; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java new file mode 100644 index 0000000..3fb37ec --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({ MiscTests.class, MediumTests.class }) +public class MajorCompactorTest { + + public static final byte[] FAMILY = Bytes.toBytes("a"); + private HBaseTestingUtility utility; + + @Before public void setUp() throws Exception { + utility = new HBaseTestingUtility(); + utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(); + } + + @After public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test public void testCompactingATable() throws Exception { + TableName tableName = TableName.valueOf("MajorCompactorTest"); + utility.createMultiRegionTable(tableName, FAMILY, 5); + utility.waitTableAvailable(tableName); + Connection connection = utility.getConnection(); + Table table = connection.getTable(tableName); + // write data and flush multiple store files: + for (int i = 0; i < 5; i++) { + utility.loadRandomRows(table, FAMILY, 50, 100); + utility.flush(tableName); + } + table.close(); + int numberOfRegions = utility.getAdmin().getRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + // we should have a table with more store files than we would before we major compacted. + assertTrue(numberOfRegions < numHFiles); + + MajorCompactor compactor = + new MajorCompactor(utility.getConfiguration(), tableName, + Sets.newHashSet(Bytes.toString(FAMILY)), 1, System.currentTimeMillis(), 200); + compactor.initializeWorkQueues(); + compactor.compactAllRegions(); + compactor.shutdown(); + + // verify that the store has been completely major compacted. + numberOfRegions = utility.getAdmin().getRegions(tableName).size(); + numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(numHFiles, numberOfRegions); + } +} \ No newline at end of file