Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B23B8174EE for ; Wed, 22 Jul 2015 19:53:02 +0000 (UTC) Received: (qmail 14894 invoked by uid 500); 22 Jul 2015 19:52:23 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 14749 invoked by uid 500); 22 Jul 2015 19:52:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 11953 invoked by uid 99); 22 Jul 2015 19:52:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2015 19:52:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 05191E00DB; Wed, 22 Jul 2015 19:52:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmhsieh@apache.org To: commits@hbase.apache.org Date: Wed, 22 Jul 2015 19:53:07 -0000 Message-Id: <8ee28d2fac594ca5a6c720e6083e3e3d@git.apache.org> In-Reply-To: <050e2ac5b877409d8887a2750c63a417@git.apache.org> References: <050e2ac5b877409d8887a2750c63a417@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [49/50] [abbrv] hbase git commit: HBASE-11339 Merge remote-tracking branch 'apache/hbase-11339' (Jingcheng Du) http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 0000000,47a0acf..ff350bf mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@@ -1,0 -1,220 +1,226 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Date; + import java.util.List; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.KeyValue; + import org.apache.hadoop.hbase.Tag; + import org.apache.hadoop.hbase.TagType; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.*; ++import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; ++import org.apache.hadoop.hbase.regionserver.HMobStore; ++import org.apache.hadoop.hbase.regionserver.InternalScanner; ++import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; ++import org.apache.hadoop.hbase.regionserver.ScannerContext; ++import org.apache.hadoop.hbase.regionserver.Store; ++import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.util.StringUtils; + + /** + * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. + * If the store is not a mob store, the flusher flushes the MemStore the same with + * DefaultStoreFlusher, + * If the store is a mob store, the flusher flushes the MemStore into two places. + * One is the store files of HBase, the other is the mob files. + *
    + *
  1. Cells that are not PUT type or have the delete mark will be directly flushed to HBase.
  2. + *
  3. If the size of a cell value is larger than a threshold, it'll be flushed + * to a mob file, another cell with the path of this file will be flushed to HBase.
  4. + *
  5. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * HBase directly.
  6. + *
+ * + */ + @InterfaceAudience.Private + public class DefaultMobStoreFlusher extends DefaultStoreFlusher { + + private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class); + private final Object flushLock = new Object(); + private long mobCellValueSizeThreshold = 0; + private Path targetPath; + private HMobStore mobStore; + + public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException { + super(conf, store); + mobCellValueSizeThreshold = store.getFamily().getMobThreshold(); + this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(), + store.getColumnFamilyName()); + if (!this.store.getFileSystem().exists(targetPath)) { + this.store.getFileSystem().mkdirs(targetPath); + } + this.mobStore = (HMobStore) store; + } + + /** + * Flushes the snapshot of the MemStore. + * If this store is not a mob store, flush the cells in the snapshot to store files of HBase. + * If the store is a mob one, the flusher flushes the MemStore into two places. + * One is the store files of HBase, the other is the mob files. + *
    + *
  1. Cells that are not PUT type or have the delete mark will be directly flushed to + * HBase.
  2. + *
  3. If the size of a cell value is larger than a threshold, it'll be + * flushed to a mob file, another cell with the path of this file will be flushed to HBase.
  4. + *
  5. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * HBase directly.
  6. + *
+ */ + @Override + public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, + MonitoredTask status) throws IOException { + ArrayList result = new ArrayList(); + int cellsCount = snapshot.getCellsCount(); + if (cellsCount == 0) return result; // don't flush if there are no entries + + // Use a store scanner to find which rows to flush. + long smallestReadPoint = store.getSmallestReadPoint(); + InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + if (scanner == null) { + return result; // NULL scanner returned from coprocessor hooks means skip normal processing + } + StoreFile.Writer writer; + try { + // TODO: We can fail in the below block before we complete adding this flush to + // list of store files. Add cleanup of anything put on filesystem if we fail. + synchronized (flushLock) { + status.setStatus("Flushing " + store + ": creating writer"); + // Write the map out to the disk + writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(), + false, true, true); + writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); + try { + // It's a mob store, flush the cells in a mob way. This is the difference of flushing + // between a normal and a mob store. + performMobFlush(snapshot, cacheFlushId, scanner, writer, status); + } finally { + finalizeWriter(writer, cacheFlushId, status); + } + } + } finally { + scanner.close(); + } + LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize=" + + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) + + ", hasBloomFilter=" + writer.hasGeneralBloom() + + ", into tmp file " + writer.getPath()); + result.add(writer.getPath()); + return result; + } + + /** + * Flushes the cells in the mob store. + *
    In the mob store, the cells with PUT type might have or have no mob tags. + *
  1. If a cell does not have a mob tag, flushing the cell to different files depends + * on the value length. If the length is larger than a threshold, it's flushed to a + * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly + * flush the cell to a store file in HBase.
  2. + *
  3. If a cell have a mob tag, its value is a mob file name, directly flush it + * to a store file in HBase.
  4. + *
+ * @param snapshot Memstore snapshot. + * @param cacheFlushId Log cache flush sequence number. + * @param scanner The scanner of memstore snapshot. + * @param writer The store file writer. + * @param status Task that represents the flush operation and may be updated with status. + * @throws IOException + */ + protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, + InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException { + StoreFile.Writer mobFileWriter = null; + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + long mobCount = 0; + long mobSize = 0; + long time = snapshot.getTimeRangeTracker().getMaximumTimestamp(); + mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + // the target path is {tableName}/.mob/{cfName}/mobFiles + // the relative path is mobFiles + byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + try { + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() + .getName()); + List cells = new ArrayList(); + boolean hasMore; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + + do { + hasMore = scanner.next(cells, scannerContext); + if (!cells.isEmpty()) { + for (Cell c : cells) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c) + || c.getTypeByte() != KeyValue.Type.Put.getCode()) { + writer.append(c); + } else { + // append the original keyValue in the mob file. + mobFileWriter.append(c); + mobSize += c.getValueLength(); + mobCount++; + + // append the tags to the KeyValue. + // The key is same, the value is the filename of the mob file + KeyValue reference = MobUtils.createMobRefKeyValue(c, fileName, tableNameTag); + writer.append(reference); + } + } + cells.clear(); + } + } while (hasMore); + } finally { + status.setStatus("Flushing mob file " + store + ": appending metadata"); + mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); + status.setStatus("Flushing mob file " + store + ": closing flushed file"); + mobFileWriter.close(); + } + + if (mobCount > 0) { + // commit the mob file from temp folder to target folder. + // If the mob file is committed successfully but the store file is not, + // the committed mob file will be handled by the sweep tool as an unused + // file. + mobStore.commitFile(mobFileWriter.getPath(), targetPath); + mobStore.updateMobFlushCount(); + mobStore.updateMobFlushedCellsCount(mobCount); + mobStore.updateMobFlushedCellsSize(mobSize); + } else { + try { + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Failed to delete the temp mob file", e); + } + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java index 0000000,703ebd6..a43e6e7 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java @@@ -1,0 -1,120 +1,120 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.IOException; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.conf.Configured; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.hbase.HBaseConfiguration; + import org.apache.hadoop.hbase.HColumnDescriptor; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.HTableDescriptor; + import org.apache.hadoop.hbase.TableName; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.client.HBaseAdmin; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + import org.apache.hadoop.util.Tool; + import org.apache.hadoop.util.ToolRunner; + + import com.google.protobuf.ServiceException; + + /** + * The cleaner to delete the expired MOB files. + */ + @InterfaceAudience.Private + public class ExpiredMobFileCleaner extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleaner.class); + /** + * Cleans the MOB files when they're expired and their min versions are 0. + * If the latest timestamp of Cells in a MOB file is older than the TTL in the column family, + * it's regarded as expired. This cleaner deletes them. + * At a time T0, the cells in a mob file M0 are expired. If a user starts a scan before T0, those + * mob cells are visible, this scan still runs after T0. At that time T1, this mob file M0 + * is expired, meanwhile a cleaner starts, the M0 is archived and can be read in the archive + * directory. + * @param tableName The current table name. + * @param family The current family. + * @throws ServiceException + * @throws IOException + */ + public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) + throws ServiceException, IOException { + Configuration conf = getConf(); + TableName tn = TableName.valueOf(tableName); + FileSystem fs = FileSystem.get(conf); + LOG.info("Cleaning the expired MOB files of " + family.getNameAsString() + " in " + tableName); + // disable the block cache. + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + CacheConfig cacheConfig = new CacheConfig(copyOfConf); + MobUtils.cleanExpiredMobFiles(fs, conf, tn, family, cacheConfig, + EnvironmentEdgeManager.currentTime()); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + ToolRunner.run(conf, new ExpiredMobFileCleaner(), args); + } + + private void printUsage() { + System.err.println("Usage:\n" + "--------------------------\n" + + ExpiredMobFileCleaner.class.getName() + " tableName familyName"); + System.err.println(" tableName The table name"); + System.err.println(" familyName The column family name"); + } + + public int run(String[] args) throws Exception { + if (args.length != 2) { + printUsage(); + return 1; + } + String tableName = args[0]; + String familyName = args[1]; + TableName tn = TableName.valueOf(tableName); + HBaseAdmin.checkHBaseAvailable(getConf()); + HBaseAdmin admin = new HBaseAdmin(getConf()); + try { + HTableDescriptor htd = admin.getTableDescriptor(tn); + HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName)); + if (family == null || !family.isMobEnabled()) { + throw new IOException("Column family " + familyName + " is not a MOB column family"); + } + if (family.getMinVersions() > 0) { + throw new IOException( + "The minVersions of the column family is not 0, could not be handled by this cleaner"); + } + cleanExpiredMobFiles(tableName, family); + return 0; + } finally { + try { + admin.close(); + } catch (IOException e) { + LOG.error("Failed to close the HBaseAdmin.", e); + } + } + } + + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java index 0000000,35d5f92..6c80355 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java @@@ -1,0 -1,63 +1,63 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hbase.HColumnDescriptor; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + + /** + * The cache configuration for the mob. + */ + @InterfaceAudience.Private + public class MobCacheConfig extends CacheConfig { + + private static MobFileCache mobFileCache; + + public MobCacheConfig(Configuration conf, HColumnDescriptor family) { + super(conf, family); + instantiateMobFileCache(conf); + } + + public MobCacheConfig(Configuration conf) { + super(conf); + instantiateMobFileCache(conf); + } + + /** + * Instantiates the MobFileCache. + * @param conf The current configuration. + * @return The current instance of MobFileCache. + */ + public static synchronized MobFileCache instantiateMobFileCache(Configuration conf) { + if (mobFileCache == null) { + mobFileCache = new MobFileCache(conf); + } + return mobFileCache; + } + + /** + * Gets the MobFileCache. + * @return The MobFileCache. + */ + public MobFileCache getMobFileCache() { + return mobFileCache; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java index 0000000,4dfb7b6..4bdfe97 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java @@@ -1,0 -1,121 +1,121 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.Tag; + import org.apache.hadoop.hbase.TagType; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.classification.InterfaceStability; + import org.apache.hadoop.hbase.util.Bytes; + + /** + * The constants used in mob. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving -public class MobConstants { ++public final class MobConstants { + + public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw"; + public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks"; + public static final String MOB_SCAN_REF_ONLY = "hbase.mob.scan.ref.only"; + public static final String EMPTY_VALUE_ON_MOBCELL_MISS = "empty.value.on.mobcell.miss"; + + public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size"; + public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000; + + public static final String MOB_DIR_NAME = "mobdir"; + public static final String MOB_REGION_NAME = ".mob"; + public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME); + + public static final String MOB_CLEANER_PERIOD = "hbase.master.mob.ttl.cleaner.period"; + public static final int DEFAULT_MOB_CLEANER_PERIOD = 24 * 60 * 60; // one day + + public static final String MOB_SWEEP_TOOL_COMPACTION_START_DATE = + "hbase.mob.sweep.tool.compaction.start.date"; + public static final String MOB_SWEEP_TOOL_COMPACTION_RATIO = + "hbase.mob.sweep.tool.compaction.ratio"; + public static final String MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE = + "hbase.mob.sweep.tool.compaction.mergeable.size"; + + public static final float DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO = 0.5f; + public static final long DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE = 128 * 1024 * 1024; + + public static final String MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME = "mobcompaction"; + + public static final String MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE = + "hbase.mob.sweep.tool.compaction.memstore.flush.size"; + public static final long DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE = + 1024 * 1024 * 128; // 128M + + public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period"; + public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio"; + public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE, + HConstants.EMPTY_BYTE_ARRAY); + + public static final float DEFAULT_EVICT_REMAIN_RATIO = 0.5f; - public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l; ++ public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600L; + + public final static String TEMP_DIR_NAME = ".tmp"; + public final static String BULKLOAD_DIR_NAME = ".bulkload"; + public final static byte[] MOB_TABLE_LOCK_SUFFIX = Bytes.toBytes(".mobLock"); + public final static String EMPTY_STRING = ""; + /** + * If the size of a mob file is less than this value, it's regarded as a small file and needs to + * be merged in mob compaction. The default value is 192MB. + */ + public static final String MOB_COMPACTION_MERGEABLE_THRESHOLD = + "hbase.mob.compaction.mergeable.threshold"; + public static final long DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024; + /** + * The max number of del files that is allowed in the mob file compaction. In the mob + * compaction, when the number of existing del files is larger than this value, they are merged + * until number of del files is not larger this value. The default value is 3. + */ + public static final String MOB_DELFILE_MAX_COUNT = "hbase.mob.delfile.max.count"; + public static final int DEFAULT_MOB_DELFILE_MAX_COUNT = 3; + /** + * The max number of the mob files that is allowed in a batch of the mob compaction. + * The mob compaction merges the small mob files to bigger ones. If the number of the + * small files is very large, it could lead to a "too many opened file handlers" in the merge. + * And the merge has to be split into batches. This value limits the number of mob files + * that are selected in a batch of the mob compaction. The default value is 100. + */ + public static final String MOB_COMPACTION_BATCH_SIZE = + "hbase.mob.compaction.batch.size"; + public static final int DEFAULT_MOB_COMPACTION_BATCH_SIZE = 100; + /** + * The period that MobCompactionChore runs. The unit is second. + * The default value is one week. + */ + public static final String MOB_COMPACTION_CHORE_PERIOD = + "hbase.mob.compaction.chore.period"; + public static final int DEFAULT_MOB_COMPACTION_CHORE_PERIOD = + 24 * 60 * 60 * 7; // a week + public static final String MOB_COMPACTOR_CLASS_KEY = "hbase.mob.compactor.class"; + /** + * The max number of threads used in MobCompactor. + */ + public static final String MOB_COMPACTION_THREADS_MAX = + "hbase.mob.compaction.threads.max"; + public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1; + private MobConstants() { + + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java index 0000000,09438db..68f19b6 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@@ -1,0 -1,152 +1,152 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.List; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.Cell; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hadoop.hbase.regionserver.BloomType; + import org.apache.hadoop.hbase.regionserver.StoreFile; + import org.apache.hadoop.hbase.regionserver.StoreFileScanner; + + /** + * The mob file. + */ + @InterfaceAudience.Private + public class MobFile { + + private StoreFile sf; + + // internal use only for sub classes + protected MobFile() { + } + + protected MobFile(StoreFile sf) { + this.sf = sf; + } + + /** + * Internal use only. This is used by the sweeper. + * + * @return The store file scanner. + * @throws IOException + */ + public StoreFileScanner getScanner() throws IOException { + List sfs = new ArrayList(); + sfs.add(sf); + List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, null, sf.getMaxMemstoreTS()); + + return sfScanners.get(0); + } + + /** + * Reads a cell from the mob file. + * @param search The cell need to be searched in the mob file. + * @param cacheMobBlocks Should this scanner cache blocks. + * @return The cell in the mob file. + * @throws IOException + */ + public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException { + return readCell(search, cacheMobBlocks, sf.getMaxMemstoreTS()); + } + + /** + * Reads a cell from the mob file. + * @param search The cell need to be searched in the mob file. + * @param cacheMobBlocks Should this scanner cache blocks. + * @param readPt the read point. + * @return The cell in the mob file. + * @throws IOException + */ + public Cell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException { + Cell result = null; + StoreFileScanner scanner = null; + List sfs = new ArrayList(); + sfs.add(sf); + try { + List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, + cacheMobBlocks, true, false, null, readPt); + if (!sfScanners.isEmpty()) { + scanner = sfScanners.get(0); + if (scanner.seek(search)) { + result = scanner.peek(); + } + } + } finally { + if (scanner != null) { + scanner.close(); + } + } + return result; + } + + /** + * Gets the file name. + * @return The file name. + */ + public String getFileName() { + return sf.getPath().getName(); + } + + /** + * Opens the underlying reader. + * It's not thread-safe. Use MobFileCache.openFile() instead. + * @throws IOException + */ + public void open() throws IOException { + if (sf.getReader() == null) { + sf.createReader(); + } + } + + /** + * Closes the underlying reader, but do no evict blocks belonging to this file. + * It's not thread-safe. Use MobFileCache.closeFile() instead. + * @throws IOException + */ + public void close() throws IOException { + if (sf != null) { + sf.closeReader(false); + sf = null; + } + } + + /** + * Creates an instance of the MobFile. + * @param fs The file system. + * @param path The path of the underlying StoreFile. + * @param conf The configuration. + * @param cacheConf The CacheConfig. + * @return An instance of the MobFile. + * @throws IOException + */ + public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf) + throws IOException { + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + return new MobFile(sf); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java index 0000000,0780f87..de3439b mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java @@@ -1,0 -1,325 +1,325 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; + import java.util.Map; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.locks.ReentrantLock; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.util.IdLock; + + import com.google.common.util.concurrent.ThreadFactoryBuilder; + + /** + * The cache for mob files. + * This cache doesn't cache the mob file blocks. It only caches the references of mob files. + * We are doing this to avoid opening and closing mob files all the time. We just keep + * references open. + */ + @InterfaceAudience.Private + public class MobFileCache { + + private static final Log LOG = LogFactory.getLog(MobFileCache.class); + + /* + * Eviction and statistics thread. Periodically run to print the statistics and + * evict the lru cached mob files when the count of the cached files is larger + * than the threshold. + */ + static class EvictionThread extends Thread { + MobFileCache lru; + + public EvictionThread(MobFileCache lru) { + super("MobFileCache.EvictionThread"); + setDaemon(true); + this.lru = lru; + } + + @Override + public void run() { + lru.evict(); + } + } + + // a ConcurrentHashMap, accesses to this map are synchronized. + private Map map = null; + // caches access count + private final AtomicLong count = new AtomicLong(0); + private long lastAccess = 0; + private final AtomicLong miss = new AtomicLong(0); + private long lastMiss = 0; + private final AtomicLong evictedFileCount = new AtomicLong(0); + private long lastEvictedFileCount = 0; + + // a lock to sync the evict to guarantee the eviction occurs in sequence. + // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there. + private final ReentrantLock evictionLock = new ReentrantLock(true); + + //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations. + private final IdLock keyLock = new IdLock(); + + private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build()); + private final Configuration conf; + + // the count of the cached references to mob files + private final int mobFileMaxCacheSize; + private final boolean isCacheEnabled; + private float evictRemainRatio; + + public MobFileCache(Configuration conf) { + this.conf = conf; + this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, + MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE); + isCacheEnabled = (mobFileMaxCacheSize > 0); + map = new ConcurrentHashMap(mobFileMaxCacheSize); + if (isCacheEnabled) { + long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD, + MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds + evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO, + MobConstants.DEFAULT_EVICT_REMAIN_RATIO); + if (evictRemainRatio < 0.0) { + evictRemainRatio = 0.0f; + LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used."); + } else if (evictRemainRatio > 1.0) { + evictRemainRatio = 1.0f; + LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used."); + } + this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period, + TimeUnit.SECONDS); + } + LOG.info("MobFileCache is initialized, and the cache size is " + mobFileMaxCacheSize); + } + + /** + * Evicts the lru cached mob files when the count of the cached files is larger + * than the threshold. + */ + public void evict() { + if (isCacheEnabled) { + // Ensure only one eviction at a time + if (!evictionLock.tryLock()) { + return; + } + printStatistics(); + List evictedFiles = new ArrayList(); + try { + if (map.size() <= mobFileMaxCacheSize) { + return; + } + List files = new ArrayList(map.values()); + Collections.sort(files); + int start = (int) (mobFileMaxCacheSize * evictRemainRatio); + if (start >= 0) { + for (int i = start; i < files.size(); i++) { + String name = files.get(i).getFileName(); + CachedMobFile evictedFile = map.remove(name); + if (evictedFile != null) { + evictedFiles.add(evictedFile); + } + } + } + } finally { + evictionLock.unlock(); + } + // EvictionLock is released. Close the evicted files one by one. + // The closes are sync in the closeFile method. + for (CachedMobFile evictedFile : evictedFiles) { + closeFile(evictedFile); + } + evictedFileCount.addAndGet(evictedFiles.size()); + } + } + + /** + * Evicts the cached file by the name. + * @param fileName The name of a cached file. + */ + public void evictFile(String fileName) { + if (isCacheEnabled) { + IdLock.Entry lockEntry = null; + try { + // obtains the lock to close the cached file. + lockEntry = keyLock.getLockEntry(fileName.hashCode()); + CachedMobFile evictedFile = map.remove(fileName); + if (evictedFile != null) { + evictedFile.close(); + evictedFileCount.incrementAndGet(); + } + } catch (IOException e) { + LOG.error("Failed to evict the file " + fileName, e); + } finally { + if (lockEntry != null) { + keyLock.releaseLockEntry(lockEntry); + } + } + } + } + + /** + * Opens a mob file. + * @param fs The current file system. + * @param path The file path. + * @param cacheConf The current MobCacheConfig + * @return A opened mob file. + * @throws IOException + */ + public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException { + if (!isCacheEnabled) { + MobFile mobFile = MobFile.create(fs, path, conf, cacheConf); + mobFile.open(); + return mobFile; + } else { + String fileName = path.getName(); + CachedMobFile cached = map.get(fileName); + IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode()); + try { + if (cached == null) { + cached = map.get(fileName); + if (cached == null) { + if (map.size() > mobFileMaxCacheSize) { + evict(); + } + cached = CachedMobFile.create(fs, path, conf, cacheConf); + cached.open(); + map.put(fileName, cached); + miss.incrementAndGet(); + } + } + cached.open(); + cached.access(count.incrementAndGet()); + } finally { + keyLock.releaseLockEntry(lockEntry); + } + return cached; + } + } + + /** + * Closes a mob file. + * @param file The mob file that needs to be closed. + */ + public void closeFile(MobFile file) { + IdLock.Entry lockEntry = null; + try { + if (!isCacheEnabled) { + file.close(); + } else { + lockEntry = keyLock.getLockEntry(file.getFileName().hashCode()); + file.close(); + } + } catch (IOException e) { + LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e); + } finally { + if (lockEntry != null) { + keyLock.releaseLockEntry(lockEntry); + } + } + } + + public void shutdown() { + this.scheduleThreadPool.shutdown(); + for (int i = 0; i < 100; i++) { + if (!this.scheduleThreadPool.isShutdown()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn("Interrupted while sleeping"); + Thread.currentThread().interrupt(); + break; + } + } + } + + if (!this.scheduleThreadPool.isShutdown()) { + List runnables = this.scheduleThreadPool.shutdownNow(); + LOG.debug("Still running " + runnables); + } + } + + /** + * Gets the count of cached mob files. + * @return The count of the cached mob files. + */ + public int getCacheSize() { + return map == null ? 0 : map.size(); + } + + /** + * Gets the count of accesses to the mob file cache. + * @return The count of accesses to the mob file cache. + */ + public long getAccessCount() { + return count.get(); + } + + /** + * Gets the count of misses to the mob file cache. + * @return The count of misses to the mob file cache. + */ + public long getMissCount() { + return miss.get(); + } + + /** + * Gets the number of items evicted from the mob file cache. + * @return The number of items evicted from the mob file cache. + */ + public long getEvictedFileCount() { + return evictedFileCount.get(); + } + + /** + * Gets the hit ratio to the mob file cache. + * @return The hit ratio to the mob file cache. + */ + public double getHitRatio() { + return count.get() == 0 ? 0 : ((float) (count.get() - miss.get())) / (float) count.get(); + } + + /** + * Prints the statistics. + */ + public void printStatistics() { + long access = count.get() - lastAccess; + long missed = miss.get() - lastMiss; + long evicted = evictedFileCount.get() - lastEvictedFileCount; + int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100); + LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: " + + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted); + lastAccess += access; + lastMiss += missed; + lastEvictedFileCount += evicted; + } + + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java index 0000000,796fe4d..2364a47 mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java @@@ -1,0 -1,169 +1,169 @@@ + /** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + -import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.util.MD5Hash; + + /** + * The mob file name. + * It consists of a md5 of a start key, a date and an uuid. + * It looks like md5(start) + date + uuid. + *
    + *
  1. characters 0-31: md5 hex string of a start key. Since the length of the start key is not + * fixed, have to use the md5 instead which has a fix length.
  2. + *
  3. characters 32-39: a string of a date with format yyyymmdd. The date is the latest timestamp + * of cells in this file
  4. + *
  5. the remaining characters: the uuid.
  6. + *
+ * Using md5 hex string of start key as the prefix of file name makes files with the same start + * key unique, they're different from the ones with other start keys + * The cells come from different regions might be in the same mob file by region split, + * this is allowed. + * Has the latest timestamp of cells in the file name in order to clean the expired mob files by + * TTL easily. If this timestamp is older than the TTL, it's regarded as expired. + */ + @InterfaceAudience.Private -public class MobFileName { ++public final class MobFileName { + + private final String date; + private final String startKey; + private final String uuid; + private final String fileName; + + /** + * @param startKey + * The start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid + * The uuid + */ + private MobFileName(byte[] startKey, String date, String uuid) { + this.startKey = MD5Hash.getMD5AsHex(startKey, 0, startKey.length); + this.uuid = uuid; + this.date = date; + this.fileName = this.startKey + date + uuid; + } + + /** + * @param startKey + * The md5 hex string of the start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid + * The uuid + */ + private MobFileName(String startKey, String date, String uuid) { + this.startKey = startKey; + this.uuid = uuid; + this.date = date; + this.fileName = this.startKey + date + uuid; + } + + /** + * Creates an instance of MobFileName + * + * @param startKey + * The start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid The uuid. + * @return An instance of a MobFileName. + */ + public static MobFileName create(byte[] startKey, String date, String uuid) { + return new MobFileName(startKey, date, uuid); + } + + /** + * Creates an instance of MobFileName + * + * @param startKey + * The md5 hex string of the start key. + * @param date + * The string of the latest timestamp of cells in this file, the format is yyyymmdd. + * @param uuid The uuid. + * @return An instance of a MobFileName. + */ + public static MobFileName create(String startKey, String date, String uuid) { + return new MobFileName(startKey, date, uuid); + } + + /** + * Creates an instance of MobFileName. + * @param fileName The string format of a file name. + * @return An instance of a MobFileName. + */ + public static MobFileName create(String fileName) { + // The format of a file name is md5HexString(0-31bytes) + date(32-39bytes) + UUID + // The date format is yyyyMMdd + String startKey = fileName.substring(0, 32); + String date = fileName.substring(32, 40); + String uuid = fileName.substring(40); + return new MobFileName(startKey, date, uuid); + } + + /** + * Gets the hex string of the md5 for a start key. + * @return The hex string of the md5 for a start key. + */ + public String getStartKey() { + return startKey; + } + + /** + * Gets the date string. Its format is yyyymmdd. + * @return The date string. + */ + public String getDate() { + return this.date; + } + + @Override + public int hashCode() { + StringBuilder builder = new StringBuilder(); + builder.append(startKey); + builder.append(date); + builder.append(uuid); + return builder.toString().hashCode(); + } + + @Override + public boolean equals(Object anObject) { + if (this == anObject) { + return true; + } + if (anObject instanceof MobFileName) { + MobFileName another = (MobFileName) anObject; + if (this.startKey.equals(another.startKey) && this.date.equals(another.date) + && this.uuid.equals(another.uuid)) { + return true; + } + } + return false; + } + + /** + * Gets the file name. + * @return The file name. + */ + public String getFileName() { + return this.fileName; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java ---------------------------------------------------------------------- diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java index 0000000,a54660c..4f6963d mode 000000,100644..100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java @@@ -1,0 -1,48 +1,48 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.mob; + + import java.io.IOException; + -import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; + import org.apache.hadoop.hbase.regionserver.Store; + + /** + * MobStoreEngine creates the mob specific compactor, and store flusher. + */ + @InterfaceAudience.Private + public class MobStoreEngine extends DefaultStoreEngine { + + @Override + protected void createStoreFlusher(Configuration conf, Store store) throws IOException { + // When using MOB, we use DefaultMobStoreFlusher always + // Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB + // specific compactor and policy when that is implemented. + storeFlusher = new DefaultMobStoreFlusher(conf, store); + } + + /** + * Creates the DefaultMobCompactor. + */ + @Override + protected void createCompactor(Configuration conf, Store store) throws IOException { + compactor = new DefaultMobStoreCompactor(conf, store); + } + }