Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2A1591958B for ; Mon, 11 Apr 2016 16:10:29 +0000 (UTC) Received: (qmail 80503 invoked by uid 500); 11 Apr 2016 16:10:27 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 80395 invoked by uid 500); 11 Apr 2016 16:10:27 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 79099 invoked by uid 99); 11 Apr 2016 16:10:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2016 16:10:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 77692EAB47; Mon, 11 Apr 2016 16:10:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: syuanjiang@apache.org To: commits@hbase.apache.org Date: Mon, 11 Apr 2016 16:10:39 -0000 Message-Id: <55f720a8335043c38d3f1a9fb3e0e8e6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/24] hbase git commit: HBASE-15400 Use DateTieredCompactor for Date Tiered Compaction (Clara Xiong) HBASE-15400 Use DateTieredCompactor for Date Tiered Compaction (Clara Xiong) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f60fc9d1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f60fc9d1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f60fc9d1 Branch: refs/heads/hbase-12439 Commit: f60fc9d1a0625970aa2fd14d29e4c1266f9571b7 Parents: d393603 Author: tedyu Authored: Thu Apr 7 14:58:59 2016 -0700 Committer: tedyu Committed: Thu Apr 7 14:58:59 2016 -0700 ---------------------------------------------------------------------- .../regionserver/DateTieredStoreEngine.java | 102 ++++++ .../hadoop/hbase/regionserver/HStore.java | 2 +- .../hadoop/hbase/regionserver/StoreFile.java | 34 +- .../compactions/CompactionConfiguration.java | 10 + .../compactions/CompactionPolicy.java | 2 +- .../compactions/CompactionRequest.java | 16 +- .../compactions/DateTieredCompactionPolicy.java | 358 +++++++++++++------ .../DateTieredCompactionRequest.java | 44 +++ .../compactions/ExploringCompactionPolicy.java | 4 +- .../compactions/FIFOCompactionPolicy.java | 5 +- .../compactions/RatioBasedCompactionPolicy.java | 318 ++++------------ .../compactions/SortedCompactionPolicy.java | 239 +++++++++++++ .../compactions/StripeCompactionPolicy.java | 3 +- .../hbase/regionserver/MockStoreFile.java | 40 ++- .../TestDateTieredCompactionPolicy.java | 325 +++++++++++++++++ .../compactions/EverythingPolicy.java | 2 +- 16 files changed, 1102 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java new file mode 100644 index 0000000..773baab --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; + +/** + * HBASE-15400 This store engine allows us to store data in date tiered layout with exponential + * sizing so that the more recent data has more granularity. Time-range scan will perform the + * best with most recent data. When data reach maxAge, they are compacted in fixed-size time + * windows for TTL and archiving. Please refer to design spec for more details. + * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/edit#heading=h.uk6y5pd3oqgx + */ +@InterfaceAudience.Private +public class DateTieredStoreEngine extends StoreEngine { + @Override + public boolean needsCompaction(List filesCompacting) { + return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), + filesCompacting); + } + + @Override + public CompactionContext createCompaction() throws IOException { + return new DateTieredCompactionContext(); + } + + @Override + protected void createComponents(Configuration conf, Store store, CellComparator kvComparator) + throws IOException { + this.compactionPolicy = new DateTieredCompactionPolicy(conf, store); + this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf, + compactionPolicy.getConf()); + this.storeFlusher = new DefaultStoreFlusher(conf, store); + this.compactor = new DateTieredCompactor(conf, store); + } + + private final class DateTieredCompactionContext extends CompactionContext { + + @Override + public List preSelect(List filesCompacting) { + return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(), + filesCompacting); + } + + @Override + public boolean select(List filesCompacting, boolean isUserCompaction, + boolean mayUseOffPeak, boolean forceMajor) throws IOException { + request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, + isUserCompaction, mayUseOffPeak, forceMajor); + return request != null; + } + + @Override + public void forceSelect(CompactionRequest request) { + if (!(request instanceof DateTieredCompactionRequest)) { + throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " + + request.getClass().getCanonicalName()); + } + super.forceSelect(request); + } + + public List compact(ThroughputController throughputController, User user) + throws IOException { + if (request instanceof DateTieredCompactionRequest) { + return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(), + throughputController, user); + } else { + throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " + + request.getClass().getCanonicalName()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7321028..9524c5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1521,7 +1521,7 @@ public class HStore implements Store { return false; } } - return storeEngine.getCompactionPolicy().isMajorCompaction( + return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction( this.storeEngine.getStoreFileManager().getStorefiles()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 868bee0..21ae417 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -18,6 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; + import java.io.DataInput; import java.io.IOException; import java.net.InetSocketAddress; @@ -62,12 +68,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; - /** * A Store data file. Stores usually have one or more of these files. They * are produced by flushing the memstore to disk. To @@ -384,7 +384,7 @@ public class StoreFile { * is turned off, fall back to BULKLOAD_TIME_KEY. * @return true if this storefile was created by bulk load. */ - boolean isBulkLoadResult() { + public boolean isBulkLoadResult() { boolean bulkLoadedHFile = false; String fileName = this.getPath().getName(); int startPos = fileName.indexOf("SeqId_"); @@ -1777,6 +1777,19 @@ public class StoreFile { Ordering.natural().onResultOf(new GetPathName()) )); + /** + * Comparator for time-aware compaction. SeqId is still the first + * ordering criterion to maintain MVCC. + */ + public static final Comparator SEQ_ID_MAX_TIMESTAMP = + Ordering.compound(ImmutableList.of( + Ordering.natural().onResultOf(new GetSeqId()), + Ordering.natural().onResultOf(new GetMaxTimestamp()), + Ordering.natural().onResultOf(new GetFileSize()).reverse(), + Ordering.natural().onResultOf(new GetBulkTime()), + Ordering.natural().onResultOf(new GetPathName()) + )); + private static class GetSeqId implements Function { @Override public Long apply(StoreFile sf) { @@ -1811,5 +1824,12 @@ public class StoreFile { return sf.getPath().getName(); } } + + private static class GetMaxTimestamp implements Function { + @Override + public Long apply(StoreFile sf) { + return sf.getMaximumTimestamp() == null? (Long)Long.MAX_VALUE : sf.getMaximumTimestamp(); + } + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 9bb4c77..97cc404 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -80,6 +80,8 @@ public class CompactionConfiguration { "hbase.hstore.compaction.date.tiered.incoming.window.min"; public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY = "hbase.hstore.compaction.date.tiered.window.policy.class"; + public static final String SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY = + "hbase.hstore.compaction.date.tiered.single.output.for.minor.compaction"; private static final Class DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; @@ -105,6 +107,7 @@ public class CompactionConfiguration { private final int windowsPerTier; private final int incomingWindowMin; private final String compactionPolicyForTieredWindow; + private final boolean singleOutputForMinorCompaction; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -134,6 +137,9 @@ public class CompactionConfiguration { incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6); compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY, DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); + singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, + true); + LOG.info(this); } @@ -274,4 +280,8 @@ public class CompactionConfiguration { public String getCompactionPolicyForTieredWindow() { return compactionPolicyForTieredWindow; } + + public boolean useSingleOutputForMinorCompaction() { + return singleOutputForMinorCompaction; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java index 3b24189..b7a788c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java @@ -45,7 +45,7 @@ public abstract class CompactionPolicy { * @param filesToCompact Files to compact. Can be null. * @return True if we should run a major compaction. */ - public abstract boolean isMajorCompaction( + public abstract boolean shouldPerformMajorCompaction( final Collection filesToCompact) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 268bb09..9e7f6a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -18,6 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + import java.util.ArrayList; import java.util.Collection; @@ -31,12 +37,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; - /** * This class holds all logical details necessary to run a compaction. */ @@ -76,6 +76,10 @@ public class CompactionRequest implements Comparable { recalculateSize(); } + public void updateFiles(Collection files) { + this.filesToCompact = files; + } + /** * Called before compaction is executed by CompactSplitThread; for use by coproc subclasses. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index 9f65e6e..d61af42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -1,4 +1,5 @@ /** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver.compactions; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; +import com.google.common.math.LongMath; import java.io.IOException; import java.util.ArrayList; @@ -35,9 +36,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -50,14 +55,13 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; * 3. Improve TTL efficiency. * Perfect fit for the use cases that: * 1. has mostly date-based data write and scan and a focus on the most recent data. - * 2. never or rarely deletes data. Out-of-order writes are handled gracefully. Time range - * overlapping among store files is tolerated and the performance impact is minimized. Configuration - * can be set at hbase-site or overriden at per-table or per-column-famly level by hbase shell. - * Design spec is at + * Out-of-order writes are handled gracefully. Time range overlapping among store files is + * tolerated and the performance impact is minimized. Configuration can be set at hbase-site + * or overridden at per-table or per-column-family level by hbase shell. Design spec is at * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { +public class DateTieredCompactionPolicy extends SortedCompactionPolicy { private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class); private RatioBasedCompactionPolicy compactionPolicyPerWindow; @@ -67,111 +71,112 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { super(conf, storeConfigInfo); try { compactionPolicyPerWindow = - ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(), - new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf, - storeConfigInfo }); + ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(), + new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf, + storeConfigInfo }); } catch (Exception e) { throw new IOException("Unable to load configured compaction policy '" + comConf.getCompactionPolicyForTieredWindow() + "'", e); } } - @Override - public boolean isMajorCompaction(Collection filesToCompact) throws IOException { - // Never do major compaction unless forced - return false; - } - - @Override /** - * Heuristics for guessing whether we need compaction. + * Heuristics for guessing whether we need minor compaction. */ - public boolean needsCompaction(final Collection storeFiles, - final List filesCompacting) { - return needsCompaction(storeFiles, filesCompacting, EnvironmentEdgeManager.currentTime()); - } - + @Override @VisibleForTesting public boolean needsCompaction(final Collection storeFiles, - final List filesCompacting, long now) { - if (!super.needsCompaction(storeFiles, filesCompacting)) { - return false; - } - + final List filesCompacting) { ArrayList candidates = new ArrayList(storeFiles); - candidates = filterBulk(candidates); - candidates = skipLargeFiles(candidates, true); try { - candidates = applyCompactionPolicy(candidates, true, false, now); + return selectMinorCompaction(candidates, false, true) != null; } catch (Exception e) { LOG.error("Can not check for compaction: ", e); return false; } - - return candidates != null && candidates.size() >= comConf.getMinFilesToCompact(); } - /** - * Could return null if no candidates are found - */ - @Override - public ArrayList applyCompactionPolicy(ArrayList candidates, - boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { - return applyCompactionPolicy(candidates, mayUseOffPeak, mayBeStuck, - EnvironmentEdgeManager.currentTime()); - } + public boolean shouldPerformMajorCompaction(final Collection filesToCompact) + throws IOException { + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || mcTime == 0) { + return false; + } - /** - * Input candidates are sorted from oldest to newest by seqId. Could return null if no candidates - * are found. - */ - @VisibleForTesting - public ArrayList applyCompactionPolicy(ArrayList candidates, - boolean mayUseOffPeak, boolean mayBeStuck, long now) throws IOException { - Iterable candidatesInWindow = - filterOldStoreFiles(Lists.newArrayList(candidates), comConf.getMaxStoreFileAgeMillis(), now); - - List> buckets = - partitionFilesToBuckets(candidatesInWindow, comConf.getBaseWindowMillis(), - comConf.getWindowsPerTier(), now); - LOG.debug("Compaction buckets are: " + buckets); - if (buckets.size() >= storeConfigInfo.getBlockingFileCount()) { - LOG.warn("Number of compaction buckets:" + buckets.size() - + ", exceeds blocking file count setting: " - + storeConfigInfo.getBlockingFileCount() - + ", either increase hbase.hstore.blockingStoreFiles or " - + "reduce the number of tiered compaction windows"); + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = EnvironmentEdgeManager.currentTime(); + if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) { + return false; } - return newestBucket(buckets, comConf.getIncomingWindowMin(), now, - comConf.getBaseWindowMillis(), mayUseOffPeak); - } + long cfTTL = this.storeConfigInfo.getStoreFileTtl(); + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); + List boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now); + boolean[] filesInWindow = new boolean[boundaries.size()]; - /** - * @param buckets the list of buckets, sorted from newest to oldest, from which to return the - * newest bucket within thresholds. - * @param incomingWindowThreshold minimum number of storeFiles in a bucket to qualify. - * @param maxThreshold maximum number of storeFiles to compact at once (the returned bucket will - * be trimmed down to this). - * @return a bucket (a list of store files within a window to be compacted). - * @throws IOException error - */ - private ArrayList newestBucket(List> buckets, - int incomingWindowThreshold, long now, long baseWindowMillis, boolean mayUseOffPeak) - throws IOException { - Window incomingWindow = getInitialWindow(now, baseWindowMillis); - for (ArrayList bucket : buckets) { - int minThreshold = - incomingWindow.compareToTimestamp(bucket.get(0).getMaximumTimestamp()) <= 0 ? comConf - .getIncomingWindowMin() : comConf.getMinFilesToCompact(); - compactionPolicyPerWindow.setMinThreshold(minThreshold); - ArrayList candidates = - compactionPolicyPerWindow.applyCompactionPolicy(bucket, mayUseOffPeak, false); - if (candidates != null && !candidates.isEmpty()) { - return candidates; + for (StoreFile file: filesToCompact) { + Long minTimestamp = file.getMinimumTimestamp(); + long oldest = (minTimestamp == null) ? (Long)Long.MIN_VALUE : now - minTimestamp.longValue(); + if (cfTTL != HConstants.FOREVER && oldest >= cfTTL) { + LOG.debug("Major compaction triggered on store " + this + + "; for TTL maintenance"); + return true; } + if (!file.isMajorCompaction() || file.isBulkLoadResult()) { + LOG.debug("Major compaction triggered on store " + this + + ", because there are new files and time since last major compaction " + + (now - lowTimestamp) + "ms"); + return true; + } + + int lowerWindowIndex = Collections.binarySearch(boundaries, + minTimestamp == null ? (Long)Long.MAX_VALUE : minTimestamp); + int upperWindowIndex = Collections.binarySearch(boundaries, + file.getMaximumTimestamp() == null ? (Long)Long.MAX_VALUE : file.getMaximumTimestamp()); + if (lowerWindowIndex != upperWindowIndex) { + LOG.debug("Major compaction triggered on store " + this + "; because file " + + file.getPath() + " has data with timestamps cross window boundaries"); + return true; + } else if (filesInWindow[upperWindowIndex]) { + LOG.debug("Major compaction triggered on store " + this + + "; because there are more than one file in some windows"); + return true; + } else { + filesInWindow[upperWindowIndex] = true; + } + hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); } - return null; + + float blockLocalityIndex = hdfsBlocksDistribution + .getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false)); + if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { + LOG.debug("Major compaction triggered on store " + this + + "; to make hdfs blocks local, current blockLocalityIndex is " + + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); + return true; + } + + LOG.debug("Skipping major compaction of " + this + + ", because the files are already major compacted"); + return false; + } + + @Override + protected CompactionRequest createCompactionRequest(ArrayList candidateSelection, + boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection) + : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); + LOG.debug("Generated compaction request: " + result); + return result; + } + + public CompactionRequest selectMajorCompaction(ArrayList candidateSelection) { + long now = EnvironmentEdgeManager.currentTime(); + long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); + return new DateTieredCompactionRequest(candidateSelection, + this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now)); } /** @@ -179,63 +184,134 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { * current file has a maxTimestamp older than last known maximum, treat this file as it carries * the last known maximum. This way both seqId and timestamp are in the same order. If files carry * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered - * by seqId and maxTimestamp in decending order and build the time windows. All the out-of-order + * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. */ - private static List> partitionFilesToBuckets(Iterable storeFiles, - long baseWindowSizeMillis, int windowsPerTier, long now) { - List> buckets = Lists.newArrayList(); - Window window = getInitialWindow(now, baseWindowSizeMillis); + public CompactionRequest selectMinorCompaction(ArrayList candidateSelection, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); + + // Make sure the store files is sorted by SeqId then maxTimestamp + List storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection, + oldestToCompact)); + Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP); List> storefileMaxTimestampPairs = - Lists.newArrayListWithCapacity(Iterables.size(storeFiles)); + Lists.newArrayListWithCapacity(Iterables.size(storeFileList)); long maxTimestampSeen = Long.MIN_VALUE; - for (StoreFile storeFile : storeFiles) { + for (StoreFile storeFile : storeFileList) { // if there is out-of-order data, // we put them in the same window as the last file in increasing order - maxTimestampSeen = Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp()); + maxTimestampSeen = Math.max(maxTimestampSeen, + storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp()); storefileMaxTimestampPairs.add(new Pair(storeFile, maxTimestampSeen)); } - Collections.reverse(storefileMaxTimestampPairs); + + Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + int minThreshold = comConf.getIncomingWindowMin(); PeekingIterator> it = Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); - while (it.hasNext()) { int compResult = window.compareToTimestamp(it.peek().getSecond()); if (compResult > 0) { // If the file is too old for the window, switch to the next window - window = window.nextWindow(windowsPerTier); + window = window.nextWindow(comConf.getWindowsPerTier(), + oldestToCompact); + minThreshold = comConf.getMinFilesToCompact(); } else { // The file is within the target window - ArrayList bucket = Lists.newArrayList(); - // Add all files in the same window to current bucket. For incoming window + ArrayList fileList = Lists.newArrayList(); + // Add all files in the same window. For incoming window // we tolerate files with future data although it is sub-optimal while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) { - bucket.add(it.next().getFirst()); + fileList.add(it.next().getFirst()); } - if (!bucket.isEmpty()) { - buckets.add(bucket); + if (fileList.size() >= minThreshold) { + LOG.debug("Processing files: " + fileList + " for window: " + window); + DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, + mayUseOffPeak, mayBeStuck, minThreshold); + if (request != null) { + return request; + } } } } + // A non-null file list is expected by HStore + return new CompactionRequest(Collections. emptyList()); + } + + private DateTieredCompactionRequest generateCompactionRequest(ArrayList storeFiles, + Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) + throws IOException { + // The files has to be in ascending order for ratio-based compaction to work right + // and removeExcessFile to exclude youngest files. + Collections.reverse(storeFiles); + + // Compact everything in the window if have more files than comConf.maxBlockingFiles + compactionPolicyPerWindow.setMinThreshold(minThreshold); + ArrayList storeFileSelection = mayBeStuck ? storeFiles + : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false); + if (storeFileSelection != null && !storeFileSelection.isEmpty()) { + // If there is any file in the window excluded from compaction, + // only one file will be output from compaction. + boolean singleOutput = storeFiles.size() != storeFileSelection.size() || + comConf.useSingleOutputForMinorCompaction(); + List boundaries = getCompactionBoundariesForMinor(window, singleOutput); + DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection, + boundaries); + return result; + } + return null; + } + + /** + * Return a list of boundaries for multiple compaction output + * in ascending order. + */ + private List getCompactBoundariesForMajor(Collection filesToCompact, + long oldestToCompact, long now) { + long minTimestamp = Long.MAX_VALUE; + for (StoreFile file : filesToCompact) { + minTimestamp = Math.min(minTimestamp, + file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp()); + } - return buckets; + List boundaries = new ArrayList(); + + // Add startMillis of all windows between now and min timestamp + for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + window.compareToTimestamp(minTimestamp) > 0; + window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) { + boundaries.add(window.startMillis()); + } + boundaries.add(Long.MIN_VALUE); + Collections.reverse(boundaries); + return boundaries; + } + + /** + * @return a list of boundaries for multiple compaction output + * from minTimestamp to maxTimestamp. + */ + private static List getCompactionBoundariesForMinor(Window window, boolean singleOutput) { + List boundaries = new ArrayList(); + boundaries.add(Long.MIN_VALUE); + if (!singleOutput) { + boundaries.add(window.startMillis()); + } + return boundaries; } /** * Removes all store files with max timestamp older than (current - maxAge). * @param storeFiles all store files to consider * @param maxAge the age in milliseconds when a store file stops participating in compaction. - * @param now current time. store files with max timestamp less than (now - maxAge) are filtered. * @return a list of storeFiles with the store file older than maxAge excluded */ - private static Iterable filterOldStoreFiles(List storeFiles, long maxAge, - long now) { - if (maxAge == 0) { - return ImmutableList.of(); - } - final long cutoff = now - maxAge; + private static Iterable filterOldStoreFiles(List storeFiles, + final long cutoff) { return Iterables.filter(storeFiles, new Predicate() { @Override public boolean apply(StoreFile storeFile) { @@ -243,13 +319,24 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { if (storeFile == null) { return false; } - return storeFile.getMaximumTimestamp() >= cutoff; + Long maxTimestamp = storeFile.getMaximumTimestamp(); + return maxTimestamp == null ? true : maxTimestamp >= cutoff; } }); } - private static Window getInitialWindow(long now, long timeUnit) { - return new Window(timeUnit, now / timeUnit); + private static Window getIncomingWindow(long now, long baseWindowMillis) { + return new Window(baseWindowMillis, now / baseWindowMillis); + } + + private static long getOldestToCompact(long maxAgeMillis, long now) { + try { + return LongMath.checkedSubtract(now, maxAgeMillis); + } catch (ArithmeticException ae) { + LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis + + ". All the files will be eligible for minor compaction."); + return Long.MIN_VALUE; + } } /** @@ -268,7 +355,7 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { private final long divPosition; private Window(long baseWindowMillis, long divPosition) { - this.windowMillis = baseWindowMillis; + windowMillis = baseWindowMillis; this.divPosition = divPosition; } @@ -279,6 +366,13 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { * or after than the timestamp. */ public int compareToTimestamp(long timestamp) { + if (timestamp < 0) { + try { + timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1); + } catch (ArithmeticException ae) { + timestamp = Long.MIN_VALUE; + } + } long pos = timestamp / windowMillis; return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; } @@ -290,12 +384,42 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { * following those will be tierBase times as big. * @return The next window */ - public Window nextWindow(int windowsPerTier) { - if (divPosition % windowsPerTier > 0) { + public Window nextWindow(int windowsPerTier, long oldestToCompact) { + // Don't promote to the next tier if there is not even 1 window at current tier + // or if the next window crosses the max age. + if (divPosition % windowsPerTier > 0 || + startMillis() - windowMillis * windowsPerTier < oldestToCompact) { return new Window(windowMillis, divPosition - 1); } else { return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); } } + + /** + * Inclusive lower bound + */ + public long startMillis() { + try { + return LongMath.checkedMultiply(windowMillis, divPosition); + } catch (ArithmeticException ae) { + return Long.MIN_VALUE; + } + } + + /** + * Exclusive upper bound + */ + public long endMillis() { + try { + return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); + } catch (ArithmeticException ae) { + return Long.MAX_VALUE; + } + } + + @Override + public String toString() { + return "[" + startMillis() + ", " + endMillis() + ")"; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java new file mode 100644 index 0000000..b33663f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.hbase.regionserver.StoreFile; + +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS", + justification="It is intended to use the same equal method as superclass") +public class DateTieredCompactionRequest extends CompactionRequest { + private List boundaries; + + public DateTieredCompactionRequest(Collection files, List boundaryList) { + super(files); + boundaries = boundaryList; + } + + public List getBoundaries() { + return boundaries; + } + + @Override + public String toString() { + return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java index c9d911d..f0cb5d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java @@ -25,8 +25,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -51,7 +51,7 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy { } @Override - final ArrayList applyCompactionPolicy(final ArrayList candidates, + protected final ArrayList applyCompactionPolicy(final ArrayList candidates, final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException { return new ArrayList(applyCompactionPolicy(candidates, mightBeStuck, mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact())); http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java index eace81f..d339898 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -76,11 +76,12 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy { } @Override - public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + public boolean shouldPerformMajorCompaction(Collection filesToCompact) + throws IOException { boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact); if(isAfterSplit){ LOG.info("Split detected, delegate to the parent policy."); - return super.isMajorCompaction(filesToCompact); + return super.shouldPerformMajorCompaction(filesToCompact); } return false; } http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 4533a9c..5600a4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -16,14 +16,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,17 +33,13 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; - /** * The default algorithm for selecting files for compaction. * Combines the compaction configuration and the provisional file selection that * it's given to produce the list of suitable candidates for compaction. */ @InterfaceAudience.Private -public class RatioBasedCompactionPolicy extends CompactionPolicy { +public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class); public RatioBasedCompactionPolicy(Configuration conf, @@ -53,154 +47,73 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { super(conf, storeConfigInfo); } - private ArrayList getCurrentEligibleFiles( - ArrayList candidateFiles, final List filesCompacting) { - // candidates = all storefiles not already in compaction queue - if (!filesCompacting.isEmpty()) { - // exclude all files older than the newest file we're currently - // compacting. this allows us to preserve contiguity (HBASE-2856) - StoreFile last = filesCompacting.get(filesCompacting.size() - 1); - int idx = candidateFiles.indexOf(last); - Preconditions.checkArgument(idx != -1); - candidateFiles.subList(0, idx + 1).clear(); - } - return candidateFiles; - } - - public List preSelectCompactionForCoprocessor( - final Collection candidates, final List filesCompacting) { - return getCurrentEligibleFiles(new ArrayList(candidates), filesCompacting); - } - - /** - * @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on - * DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based - * on seqId for data consistency. - * @return subset copy of candidate list that meets compaction criteria - * @throws java.io.IOException - */ - public CompactionRequest selectCompaction(Collection candidateFiles, - final List filesCompacting, final boolean isUserCompaction, - final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { - // Preliminary compaction subject to filters - ArrayList candidateSelection = new ArrayList(candidateFiles); - // Stuck and not compacting enough (estimate). It is not guaranteed that we will be - // able to compact more if stuck and compacting, because ratio policy excludes some - // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). - int futureFiles = filesCompacting.isEmpty() ? 0 : 1; - boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) - >= storeConfigInfo.getBlockingFileCount(); - candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); - LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + - filesCompacting.size() + " compacting, " + candidateSelection.size() + - " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); - - // If we can't have all files, we cannot do major anyway - boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); - if (!(forceMajor && isAllFiles)) { - candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak); - isAllFiles = candidateFiles.size() == candidateSelection.size(); - } - - // Try a major compaction if this is a user-requested major compaction, - // or if we do not have too many files to compact and this was requested as a major compaction - boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction) - || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection)) - && (candidateSelection.size() < comConf.getMaxFilesToCompact())); - // Or, if there are any references among the candidates. - boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); - if (!isTryingMajor && !isAfterSplit) { - // We're are not compacting all files, let's see what files are applicable - candidateSelection = filterBulk(candidateSelection); - candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); - candidateSelection = checkMinFilesCriteria(candidateSelection); - } - candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor); - // Now we have the final file list, so we can determine if we can do major/all files. - isAllFiles = (candidateFiles.size() == candidateSelection.size()); - CompactionRequest result = new CompactionRequest(candidateSelection); - result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak); - result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); - return result; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * exclude all files above maxCompactSize - * Also save all references. We MUST compact them + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. */ - protected ArrayList skipLargeFiles(ArrayList candidates, - boolean mayUseOffpeak) { - int pos = 0; - while (pos < candidates.size() && !candidates.get(pos).isReference() - && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) { - ++pos; - } - if (pos > 0) { - LOG.debug("Some files are too large. Excluding " + pos - + " files from compaction candidates"); - candidates.subList(0, pos).clear(); + @Override + public boolean shouldPerformMajorCompaction(final Collection filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; } - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * exclude all bulk load files if configured - */ - protected ArrayList filterBulk(ArrayList candidates) { - candidates.removeAll(Collections2.filter(candidates, - new Predicate() { - @Override - public boolean apply(StoreFile input) { - return input.excludeFromMinorCompaction(); + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTTL = this.storeConfigInfo.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.iterator().next(); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && (cfTTL == HConstants.FOREVER || oldest < cfTTL)) { + float blockLocalityIndex = + sf.getHDFSBlockDistribution().getBlockLocalityIndex( + RSRpcServices.getHostname(comConf.conf, false)); + if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { + LOG.debug("Major compaction triggered on only store " + this + + "; to make hdfs blocks local, current blockLocalityIndex is " + + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); + result = true; + } else { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only, oldestTime " + oldest + + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex + + " (min " + comConf.getMinLocalityToForceCompact() + ")"); } - })); - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * take upto maxFilesToCompact from the start - */ - private ArrayList removeExcessFiles(ArrayList candidates, - boolean isUserCompaction, boolean isMajorCompaction) { - int excess = candidates.size() - comConf.getMaxFilesToCompact(); - if (excess > 0) { - if (isMajorCompaction && isUserCompaction) { - LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + - " files because of a user-requested major compaction"); + } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } } else { - LOG.debug("Too many admissible files. Excluding " + excess - + " files from compaction candidates"); - candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear(); + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); } + result = true; } - return candidates; + return result; } - /** - * @param candidates pre-filtrate - * @return filtered subset - * forget the compactionSelection if we don't have enough files - */ - protected ArrayList checkMinFilesCriteria(ArrayList candidates) { - int minFiles = comConf.getMinFilesToCompact(); - if (candidates.size() < minFiles) { - if(LOG.isDebugEnabled()) { - LOG.debug("Not compacting files because we only have " + candidates.size() + - " files ready for compaction. Need " + minFiles + " to initiate."); - } - candidates.clear(); + + @Override + protected CompactionRequest createCompactionRequest(ArrayList + candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) + throws IOException { + if (!tryingMajor) { + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); + candidateSelection = checkMinFilesCriteria(candidateSelection, + comConf.getMinFilesToCompact()); } - return candidates; + return new CompactionRequest(candidateSelection); } /** - * @param candidates pre-filtrate - * @return filtered subset * -- Default minor compaction selection algorithm: * choose CompactSelection from candidates -- * First exclude bulk-load files if indicated in configuration. @@ -227,9 +140,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { * | | | | | | | | _ | | * | | | | | | | | | | | | * | | | | | | | | | | | | + * @param candidates pre-filtrate + * @return filtered subset */ - ArrayList applyCompactionPolicy(ArrayList candidates, - boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + protected ArrayList applyCompactionPolicy(ArrayList candidates, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { if (candidates.isEmpty()) { return candidates; } @@ -276,114 +191,12 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return candidates; } - /* - * @param filesToCompact Files to compact. Can be null. - * @return True if we should run a major compaction. - */ - @Override - public boolean isMajorCompaction(final Collection filesToCompact) - throws IOException { - boolean result = false; - long mcTime = getNextMajorCompactTime(filesToCompact); - if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { - return result; - } - // TODO: Use better method for determining stamp of last major (HBASE-2990) - long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); - long now = System.currentTimeMillis(); - if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { - // Major compaction time has elapsed. - long cfTtl = this.storeConfigInfo.getStoreFileTtl(); - if (filesToCompact.size() == 1) { - // Single file - StoreFile sf = filesToCompact.iterator().next(); - Long minTimestamp = sf.getMinimumTimestamp(); - long oldest = (minTimestamp == null) - ? Long.MIN_VALUE - : now - minTimestamp.longValue(); - if (sf.isMajorCompaction() && - (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { - float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( - RSRpcServices.getHostname(comConf.conf, false) - ); - if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on only store " + this + - "; to make hdfs blocks local, current blockLocalityIndex is " + - blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + - ")"); - } - result = true; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this + - " because one (major) compacted file only, oldestTime " + - oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " + - blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + - ")"); - } - } - } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { - LOG.debug("Major compaction triggered on store " + this + - ", because keyvalues outdated; time since last major compaction " + - (now - lowTimestamp) + "ms"); - result = true; - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store " + this + - "; time since last major compaction " + (now - lowTimestamp) + "ms"); - } - result = true; - } - } - return result; - } - /** - * Used calculation jitter + * A heuristic method to decide whether to schedule a compaction request + * @param storeFiles files in the store. + * @param filesCompacting files being scheduled to compact. + * @return true to schedule a request. */ - private final Random random = new Random(); - - /** - * @param filesToCompact - * @return When to run next major compaction - */ - public long getNextMajorCompactTime(final Collection filesToCompact) { - // default = 24hrs - long ret = comConf.getMajorCompactionPeriod(); - if (ret > 0) { - // default = 20% = +/- 4.8 hrs - double jitterPct = comConf.getMajorCompactionJitter(); - if (jitterPct > 0) { - long jitter = Math.round(ret * jitterPct); - // deterministic jitter avoids a major compaction storm on restart - Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); - if (seed != null) { - // Synchronized to ensure one user of random instance at a time. - double rnd = -1; - synchronized (this) { - this.random.setSeed(seed); - rnd = this.random.nextDouble(); - } - ret += jitter - Math.round(2L * jitter * rnd); - } else { - ret = 0; // If seed is null, then no storefiles == no major compaction - } - } - } - return ret; - } - - /** - * @param compactionSize Total size of some compaction - * @return whether this should be a large or small compaction - */ - @Override - public boolean throttleCompaction(long compactionSize) { - return compactionSize > comConf.getThrottlePoint(); - } - public boolean needsCompaction(final Collection storeFiles, final List filesCompacting) { int numCandidates = storeFiles.size() - filesCompacting.size(); @@ -392,7 +205,6 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { /** * Overwrite min threshold for compaction - * @param minThreshold min to update to */ public void setMinThreshold(int minThreshold) { comConf.setMinFilesToCompact(minThreshold); http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java new file mode 100644 index 0000000..77b0af8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; + +/** + * An abstract compaction policy that select files on seq id order. + */ +@InterfaceAudience.Private +public abstract class SortedCompactionPolicy extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(SortedCompactionPolicy.class); + + public SortedCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { + super(conf, storeConfigInfo); + } + + public List preSelectCompactionForCoprocessor(final Collection candidates, + final List filesCompacting) { + return getCurrentEligibleFiles(new ArrayList(candidates), filesCompacting); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on + * DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based + * on seqId for data consistency. + * @return subset copy of candidate list that meets compaction criteria + */ + public CompactionRequest selectCompaction(Collection candidateFiles, + final List filesCompacting, final boolean isUserCompaction, + final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { + // Preliminary compaction subject to filters + ArrayList candidateSelection = new ArrayList(candidateFiles); + // Stuck and not compacting enough (estimate). It is not guaranteed that we will be + // able to compact more if stuck and compacting, because ratio policy excludes some + // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). + int futureFiles = filesCompacting.isEmpty() ? 0 : 1; + boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) + >= storeConfigInfo.getBlockingFileCount(); + + candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); + LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + + filesCompacting.size() + " compacting, " + candidateSelection.size() + + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); + + // If we can't have all files, we cannot do major anyway + boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); + if (!(forceMajor && isAllFiles)) { + candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak); + isAllFiles = candidateFiles.size() == candidateSelection.size(); + } + + // Try a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested as a major compaction + boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction) + || (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection)) + && (candidateSelection.size() < comConf.getMaxFilesToCompact())); + // Or, if there are any references among the candidates. + boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); + + CompactionRequest result = createCompactionRequest(candidateSelection, + isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); + + ArrayList filesToCompact = Lists.newArrayList(result.getFiles()); + removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor); + result.updateFiles(filesToCompact); + + isAllFiles = (candidateFiles.size() == filesToCompact.size()); + result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak); + result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); + + return result; + } + + protected abstract CompactionRequest createCompactionRequest(ArrayList + candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) + throws IOException; + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public abstract boolean shouldPerformMajorCompaction(final Collection filesToCompact) + throws IOException; + + /** + * Used calculation jitter + */ + private final Random random = new Random(); + + /** + * @param filesToCompact + * @return When to run next major compaction + */ + public long getNextMajorCompactTime(final Collection filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + // Synchronized to ensure one user of random instance at a time. + double rnd = -1; + synchronized (this) { + this.random.setSeed(seed); + rnd = this.random.nextDouble(); + } + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // If seed is null, then no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + public abstract boolean needsCompaction(final Collection storeFiles, + final List filesCompacting); + + protected ArrayList getCurrentEligibleFiles(ArrayList candidateFiles, + final List filesCompacting) { + // candidates = all storefiles not already in compaction queue + if (!filesCompacting.isEmpty()) { + // exclude all files older than the newest file we're currently + // compacting. this allows us to preserve contiguity (HBASE-2856) + StoreFile last = filesCompacting.get(filesCompacting.size() - 1); + int idx = candidateFiles.indexOf(last); + Preconditions.checkArgument(idx != -1); + candidateFiles.subList(0, idx + 1).clear(); + } + return candidateFiles; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + protected ArrayList skipLargeFiles(ArrayList candidates, + boolean mayUseOffpeak) { + int pos = 0; + while (pos < candidates.size() && !candidates.get(pos).isReference() + && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.subList(0, pos).clear(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset exclude all bulk load files if configured + */ + protected ArrayList filterBulk(ArrayList candidates) { + candidates.removeAll(Collections2.filter(candidates, new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + */ + protected void removeExcessFiles(ArrayList candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear(); + } + } + } + + /** + * @param candidates pre-filtrate + * @return filtered subset forget the compactionSelection if we don't have enough files + */ + protected ArrayList checkMinFilesCriteria(ArrayList candidates, + int minFiles) { + if (candidates.size() < minFiles) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + candidates.size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.clear(); + } + return candidates; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index e8a4340..ff1dd8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -166,7 +166,8 @@ public class StripeCompactionPolicy extends CompactionPolicy { } @Override - public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + public boolean shouldPerformMajorCompaction(Collection filesToCompact) + throws IOException { return false; // there's never a major compaction! } http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index df039e7..32dc227 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -25,8 +25,11 @@ import java.util.TreeMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** A mock used so our tests don't deal with actual StoreFiles */ public class MockStoreFile extends StoreFile { @@ -38,6 +41,9 @@ public class MockStoreFile extends StoreFile { byte[] splitPoint = null; TimeRangeTracker timeRangeTracker; long entryCount; + boolean isMajor; + HDFSBlocksDistribution hdfsBlocksDistribution; + long modificationTime; MockStoreFile(HBaseTestingUtility testUtil, Path testPath, long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException { @@ -47,6 +53,11 @@ public class MockStoreFile extends StoreFile { this.isRef = isRef; this.ageInDisk = ageInDisk; this.sequenceid = sequenceid; + this.isMajor = false; + hdfsBlocksDistribution = new HDFSBlocksDistribution(); + hdfsBlocksDistribution.addHostsAndBlockWeight( + new String[] { RSRpcServices.getHostname(testUtil.getConfiguration(), false) }, 1); + modificationTime = EnvironmentEdgeManager.currentTime(); } void setLength(long newLen) { @@ -65,21 +76,20 @@ public class MockStoreFile extends StoreFile { @Override public boolean isMajorCompaction() { - return false; + return isMajor; } - @Override - public boolean isReference() { - return this.isRef; + public void setIsMajor(boolean isMajor) { + this.isMajor = isMajor; } @Override - boolean isBulkLoadResult() { - return false; + public boolean isReference() { + return this.isRef; } @Override - public boolean isCompactedAway() { + public boolean isBulkLoadResult() { return false; } @@ -102,14 +112,22 @@ public class MockStoreFile extends StoreFile { public Long getMinimumTimestamp() { return (timeRangeTracker == null) ? - null : - timeRangeTracker.getMinimumTimestamp(); + null : timeRangeTracker.getMinimumTimestamp(); } public Long getMaximumTimestamp() { return (timeRangeTracker == null) ? - null : - timeRangeTracker.getMaximumTimestamp(); + null : timeRangeTracker.getMaximumTimestamp(); + } + + @Override + public long getModificationTimeStamp() { + return modificationTime; + } + + @Override + public HDFSBlocksDistribution getHDFSBlockDistribution() { + return hdfsBlocksDistribution; } @Override