hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject [48/50] [abbrv] hbase git commit: HBASE-11339 Merge remote-tracking branch 'apache/hbase-11339' (Jingcheng Du)
Date Wed, 22 Jul 2015 19:53:06 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 0000000,d283729..424a39b
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@@ -1,0 -1,897 +1,898 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob;
+ 
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.security.Key;
+ import java.security.KeyException;
+ import java.text.ParseException;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Date;
+ import java.util.List;
+ import java.util.UUID;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.RejectedExecutionException;
+ import java.util.concurrent.RejectedExecutionHandler;
+ import java.util.concurrent.SynchronousQueue;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.CellComparator;
+ import org.apache.hadoop.hbase.HBaseConfiguration;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.HRegionInfo;
+ import org.apache.hadoop.hbase.HTableDescriptor;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.TableName;
+ import org.apache.hadoop.hbase.Tag;
+ import org.apache.hadoop.hbase.TagType;
+ import org.apache.hadoop.hbase.backup.HFileArchiver;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.io.HFileLink;
+ import org.apache.hadoop.hbase.io.compress.Compression;
+ import org.apache.hadoop.hbase.io.crypto.Cipher;
+ import org.apache.hadoop.hbase.io.crypto.Encryption;
+ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+ import org.apache.hadoop.hbase.io.hfile.HFileContext;
+ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+ import org.apache.hadoop.hbase.master.TableLockManager;
+ import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+ import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
+ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
+ import org.apache.hadoop.hbase.regionserver.BloomType;
+ import org.apache.hadoop.hbase.regionserver.HStore;
+ import org.apache.hadoop.hbase.regionserver.StoreFile;
+ import org.apache.hadoop.hbase.security.EncryptionUtil;
+ import org.apache.hadoop.hbase.security.User;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ import org.apache.hadoop.hbase.util.FSUtils;
+ import org.apache.hadoop.hbase.util.ReflectionUtils;
+ import org.apache.hadoop.hbase.util.Threads;
+ 
+ /**
+  * The mob utilities
+  */
+ @InterfaceAudience.Private
+ public class MobUtils {
+ 
+   private static final Log LOG = LogFactory.getLog(MobUtils.class);
+ 
+   private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
+       new ThreadLocal<SimpleDateFormat>() {
+     @Override
+     protected SimpleDateFormat initialValue() {
+       return new SimpleDateFormat("yyyyMMdd");
+     }
+   };
+ 
+   /**
+    * Formats a date to a string.
+    * @param date The date.
+    * @return The string format of the date, it's yyyymmdd.
+    */
+   public static String formatDate(Date date) {
+     return LOCAL_FORMAT.get().format(date);
+   }
+ 
+   /**
+    * Parses the string to a date.
+    * @param dateString The string format of a date, it's yyyymmdd.
+    * @return A date.
+    * @throws ParseException
+    */
+   public static Date parseDate(String dateString) throws ParseException {
+     return LOCAL_FORMAT.get().parse(dateString);
+   }
+ 
+   /**
+    * Whether the current cell is a mob reference cell.
+    * @param cell The current cell.
+    * @return True if the cell has a mob reference tag, false if it doesn't.
+    */
+   public static boolean isMobReferenceCell(Cell cell) {
+     if (cell.getTagsLength() > 0) {
+       Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+           TagType.MOB_REFERENCE_TAG_TYPE);
+       return tag != null;
+     }
+     return false;
+   }
+ 
+   /**
+    * Gets the table name tag.
+    * @param cell The current cell.
+    * @return The table name tag.
+    */
+   public static Tag getTableNameTag(Cell cell) {
+     if (cell.getTagsLength() > 0) {
+       Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+           TagType.MOB_TABLE_NAME_TAG_TYPE);
+       return tag;
+     }
+     return null;
+   }
+ 
+   /**
+    * Whether the tag list has a mob reference tag.
+    * @param tags The tag list.
+    * @return True if the list has a mob reference tag, false if it doesn't.
+    */
+   public static boolean hasMobReferenceTag(List<Tag> tags) {
+     if (!tags.isEmpty()) {
+       for (Tag tag : tags) {
+         if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
+           return true;
+         }
+       }
+     }
+     return false;
+   }
+ 
+   /**
+    * Indicates whether it's a raw scan.
+    * The information is set in the attribute "hbase.mob.scan.raw" of scan.
+    * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
+    * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
+    * the mob file.
+    * @param scan The current scan.
+    * @return True if it's a raw scan.
+    */
+   public static boolean isRawMobScan(Scan scan) {
+     byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
+     try {
+       return raw != null && Bytes.toBoolean(raw);
+     } catch (IllegalArgumentException e) {
+       return false;
+     }
+   }
+ 
+   /**
+    * Indicates whether it's a reference only scan.
+    * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
+    * If it's a ref only scan, only the cells with ref tag are returned.
+    * @param scan The current scan.
+    * @return True if it's a ref only scan.
+    */
+   public static boolean isRefOnlyScan(Scan scan) {
+     byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
+     try {
+       return refOnly != null && Bytes.toBoolean(refOnly);
+     } catch (IllegalArgumentException e) {
+       return false;
+     }
+   }
+ 
+   /**
+    * Indicates whether the scan contains the information of caching blocks.
+    * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
+    * @param scan The current scan.
+    * @return True when the Scan attribute specifies to cache the MOB blocks.
+    */
+   public static boolean isCacheMobBlocks(Scan scan) {
+     byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
+     try {
+       return cache != null && Bytes.toBoolean(cache);
+     } catch (IllegalArgumentException e) {
+       return false;
+     }
+   }
+ 
+   /**
+    * Sets the attribute of caching blocks in the scan.
+    *
+    * @param scan
+    *          The current scan.
+    * @param cacheBlocks
+    *          True, set the attribute of caching blocks into the scan, the scanner with this scan
+    *          caches blocks.
+    *          False, the scanner doesn't cache blocks for this scan.
+    */
+   public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
+     scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
+   }
+ 
+   /**
+    * Cleans the expired mob files.
+    * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
+    * the minVersions of that column family is 0.
+    * @param fs The current file system.
+    * @param conf The current configuration.
+    * @param tableName The current table name.
+    * @param columnDescriptor The descriptor of the current column family.
+    * @param cacheConfig The cacheConfig that disables the block cache.
+    * @param current The current time.
+    * @throws IOException
+    */
+   public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
+       HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
+       throws IOException {
+     long timeToLive = columnDescriptor.getTimeToLive();
+     if (Integer.MAX_VALUE == timeToLive) {
+       // no need to clean, because the TTL is not set.
+       return;
+     }
+ 
+     Date expireDate = new Date(current - timeToLive * 1000);
+     expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
+     LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
+ 
+     FileStatus[] stats = null;
+     Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
+     Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
+     try {
+       stats = fs.listStatus(path);
+     } catch (FileNotFoundException e) {
+       LOG.warn("Failed to find the mob file " + path, e);
+     }
+     if (null == stats) {
+       // no file found
+       return;
+     }
+     List<StoreFile> filesToClean = new ArrayList<StoreFile>();
+     int deletedFileCount = 0;
+     for (FileStatus file : stats) {
+       String fileName = file.getPath().getName();
+       try {
+         MobFileName mobFileName = null;
+         if (!HFileLink.isHFileLink(file.getPath())) {
+           mobFileName = MobFileName.create(fileName);
+         } else {
+           HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
+           mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
+         }
+         Date fileDate = parseDate(mobFileName.getDate());
+         if (LOG.isDebugEnabled()) {
+           LOG.debug("Checking file " + fileName);
+         }
+         if (fileDate.getTime() < expireDate.getTime()) {
+           if (LOG.isDebugEnabled()) {
+             LOG.debug(fileName + " is an expired file");
+           }
+           filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+         }
+       } catch (Exception e) {
+         LOG.error("Cannot parse the fileName " + fileName, e);
+       }
+     }
+     if (!filesToClean.isEmpty()) {
+       try {
+         removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
+             filesToClean);
+         deletedFileCount = filesToClean.size();
+       } catch (IOException e) {
+         LOG.error("Failed to delete the mob files " + filesToClean, e);
+       }
+     }
+     LOG.info(deletedFileCount + " expired mob files are deleted");
+   }
+ 
+   /**
+    * Gets the root dir of the mob files.
+    * It's {HBASE_DIR}/mobdir.
+    * @param conf The current configuration.
+    * @return the root dir of the mob file.
+    */
+   public static Path getMobHome(Configuration conf) {
+     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+     return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+   }
+ 
+   /**
+    * Gets the qualified root dir of the mob files.
+    * @param conf The current configuration.
+    * @return The qualified root dir.
+    * @throws IOException
+    */
+   public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
+     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+     Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+     FileSystem fs = mobRootDir.getFileSystem(conf);
+     return mobRootDir.makeQualified(fs);
+   }
+ 
+   /**
+    * Gets the region dir of the mob files.
+    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
+    * @param conf The current configuration.
+    * @param tableName The current table name.
+    * @return The region dir of the mob files.
+    */
+   public static Path getMobRegionPath(Configuration conf, TableName tableName) {
+     Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
+     HRegionInfo regionInfo = getMobRegionInfo(tableName);
+     return new Path(tablePath, regionInfo.getEncodedName());
+   }
+ 
+   /**
+    * Gets the family dir of the mob files.
+    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+    * @param conf The current configuration.
+    * @param tableName The current table name.
+    * @param familyName The current family name.
+    * @return The family dir of the mob files.
+    */
+   public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
+     return new Path(getMobRegionPath(conf, tableName), familyName);
+   }
+ 
+   /**
+    * Gets the family dir of the mob files.
+    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+    * @param regionPath The path of mob region which is a dummy one.
+    * @param familyName The current family name.
+    * @return The family dir of the mob files.
+    */
+   public static Path getMobFamilyPath(Path regionPath, String familyName) {
+     return new Path(regionPath, familyName);
+   }
+ 
+   /**
+    * Gets the HRegionInfo of the mob files.
+    * This is a dummy region. The mob files are not saved in a region in HBase.
+    * This is only used in mob snapshot. It's internally used only.
+    * @param tableName
+    * @return A dummy mob region info.
+    */
+   public static HRegionInfo getMobRegionInfo(TableName tableName) {
+     HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
+         HConstants.EMPTY_END_ROW, false, 0);
+     return info;
+   }
+ 
+   /**
+    * Gets whether the current HRegionInfo is a mob one.
+    * @param regionInfo The current HRegionInfo.
+    * @return If true, the current HRegionInfo is a mob one.
+    */
+   public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
+     return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
+         .equals(regionInfo.getEncodedName());
+   }
+ 
+   /**
+    * Gets whether the current region name follows the pattern of a mob region name.
+    * @param tableName The current table name.
+    * @param regionName The current region name.
+    * @return True if the current region name follows the pattern of a mob region name.
+    */
+   public static boolean isMobRegionName(TableName tableName, byte[] regionName) {
+     return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName());
+   }
+ 
+   /**
+    * Gets the working directory of the mob compaction.
+    * @param root The root directory of the mob compaction.
+    * @param jobName The current job name.
+    * @return The directory of the mob compaction for the current job.
+    */
+   public static Path getCompactionWorkingPath(Path root, String jobName) {
+     return new Path(root, jobName);
+   }
+ 
+   /**
+    * Archives the mob files.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param tableName The table name.
+    * @param tableDir The table directory.
+    * @param family The name of the column family.
+    * @param storeFiles The files to be deleted.
+    * @throws IOException
+    */
+   public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
+       Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
+     HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
+         storeFiles);
+   }
+ 
+   /**
+    * Creates a mob reference KeyValue.
+    * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
+    * @param cell The original Cell.
+    * @param fileName The mob file name where the mob reference KeyValue is written.
+    * @param tableNameTag The tag of the current table name. It's very important in
+    *                        cloning the snapshot.
+    * @return The mob reference KeyValue.
+    */
+   public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
+     // Append the tags to the KeyValue.
+     // The key is same, the value is the filename of the mob file
+     List<Tag> tags = new ArrayList<Tag>();
+     // Add the ref tag as the 1st one.
+     tags.add(MobConstants.MOB_REF_TAG);
+     // Add the tag of the source table name, this table is where this mob file is flushed
+     // from.
+     // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
+     // find the original mob files by this table name. For details please see cloning
+     // snapshot for mob files.
+     tags.add(tableNameTag);
+     // Add the existing tags.
+     tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
+     int valueLength = cell.getValueLength();
+     byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
+     KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+         cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
+     reference.setSequenceId(cell.getSequenceId());
+     return reference;
+   }
+ 
+   /**
+    * Creates a writer for the mob file in temp directory.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param family The descriptor of the current column family.
+    * @param date The date string, its format is yyyymmmdd.
+    * @param basePath The basic path for a temp directory.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @param startKey The hex string of the start key.
+    * @param cacheConfig The current cache config.
+    * @param cryptoContext The encryption context.
+    * @return The writer for the mob file.
+    * @throws IOException
+    */
+   public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+       Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
+       Encryption.Context cryptoContext)
+       throws IOException {
+     MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
+         .replaceAll("-", ""));
+     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
+       cacheConfig, cryptoContext);
+   }
+ 
+   /**
+    * Creates a writer for the ref file in temp directory.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param family The descriptor of the current column family.
+    * @param basePath The basic path for a temp directory.
+    * @param maxKeyCount The key count.
+    * @param cacheConfig The current cache config.
+    * @param cryptoContext The encryption context.
+    * @return The writer for the mob file.
+    * @throws IOException
+    */
+   public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
+     HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
+     Encryption.Context cryptoContext)
+     throws IOException {
+     HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
+       .withIncludesTags(true).withCompression(family.getCompactionCompression())
+       .withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf))
+       .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
+       .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
+       .withEncryptionContext(cryptoContext).withCreateTime(EnvironmentEdgeManager.currentTime())
+       .build();
+     Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
+     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
+       .withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType())
+       .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+     return w;
+   }
+ 
+   /**
+    * Creates a writer for the mob file in temp directory.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param family The descriptor of the current column family.
+    * @param date The date string, its format is yyyymmmdd.
+    * @param basePath The basic path for a temp directory.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @param startKey The start key.
+    * @param cacheConfig The current cache config.
+    * @param cryptoContext The encryption context.
+    * @return The writer for the mob file.
+    * @throws IOException
+    */
+   public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+       Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
+       Encryption.Context cryptoContext)
+       throws IOException {
+     MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
+         .replaceAll("-", ""));
+     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
+       cacheConfig, cryptoContext);
+   }
+ 
+   /**
+    * Creates a writer for the del file in temp directory.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param family The descriptor of the current column family.
+    * @param date The date string, its format is yyyymmmdd.
+    * @param basePath The basic path for a temp directory.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @param startKey The start key.
+    * @param cacheConfig The current cache config.
+    * @param cryptoContext The encryption context.
+    * @return The writer for the del file.
+    * @throws IOException
+    */
+   public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
+       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+       Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
+       Encryption.Context cryptoContext)
+       throws IOException {
+     String suffix = UUID
+       .randomUUID().toString().replaceAll("-", "") + "_del";
+     MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
+     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
+       cacheConfig, cryptoContext);
+   }
+ 
+   /**
+    * Creates a writer for the mob file in temp directory.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param family The descriptor of the current column family.
+    * @param mobFileName The mob file name.
+    * @param basePath The basic path for a temp directory.
+    * @param maxKeyCount The key count.
+    * @param compression The compression algorithm.
+    * @param cacheConfig The current cache config.
+    * @param cryptoContext The encryption context.
+    * @return The writer for the mob file.
+    * @throws IOException
+    */
+   private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+     HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
+     Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext)
+     throws IOException {
+     HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+       .withIncludesMvcc(true).withIncludesTags(true)
+       .withCompressTags(family.isCompressTags())
+       .withChecksumType(HStore.getChecksumType(conf))
+       .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
+       .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
+       .withEncryptionContext(cryptoContext)
+       .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
+ 
+     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
+       .withFilePath(new Path(basePath, mobFileName.getFileName()))
+       .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
+       .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+     return w;
+   }
+ 
+   /**
+    * Commits the mob file.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param sourceFile The path where the mob file is saved.
+    * @param targetPath The directory path where the source file is renamed to.
+    * @param cacheConfig The current cache config.
+    * @return The target file path the source file is renamed to.
+    * @throws IOException
+    */
+   public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
+       Path targetPath, CacheConfig cacheConfig) throws IOException {
+     if (sourceFile == null) {
+       return null;
+     }
+     Path dstPath = new Path(targetPath, sourceFile.getName());
+     validateMobFile(conf, fs, sourceFile, cacheConfig);
+     String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+     LOG.info(msg);
+     Path parent = dstPath.getParent();
+     if (!fs.exists(parent)) {
+       fs.mkdirs(parent);
+     }
+     if (!fs.rename(sourceFile, dstPath)) {
+       throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+     }
+     return dstPath;
+   }
+ 
+   /**
+    * Validates a mob file by opening and closing it.
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param path The path where the mob file is saved.
+    * @param cacheConfig The current cache config.
+    */
+   private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
+       CacheConfig cacheConfig) throws IOException {
+     StoreFile storeFile = null;
+     try {
+       storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
+       storeFile.createReader();
+     } catch (IOException e) {
+       LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
+       throw e;
+     } finally {
+       if (storeFile != null) {
+         storeFile.closeReader(false);
+       }
+     }
+   }
+ 
+   /**
+    * Indicates whether the current mob ref cell has a valid value.
+    * A mob ref cell has a mob reference tag.
+    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+    * The real mob value length takes 4 bytes.
+    * The remaining part is the mob file name.
+    * @param cell The mob ref cell.
+    * @return True if the cell has a valid value.
+    */
+   public static boolean hasValidMobRefCellValue(Cell cell) {
+     return cell.getValueLength() > Bytes.SIZEOF_INT;
+   }
+ 
+   /**
+    * Gets the mob value length from the mob ref cell.
+    * A mob ref cell has a mob reference tag.
+    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+    * The real mob value length takes 4 bytes.
+    * The remaining part is the mob file name.
+    * @param cell The mob ref cell.
+    * @return The real mob value length.
+    */
+   public static int getMobValueLength(Cell cell) {
+     return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
+   }
+ 
+   /**
+    * Gets the mob file name from the mob ref cell.
+    * A mob ref cell has a mob reference tag.
+    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+    * The real mob value length takes 4 bytes.
+    * The remaining part is the mob file name.
+    * @param cell The mob ref cell.
+    * @return The mob file name.
+    */
+   public static String getMobFileName(Cell cell) {
+     return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
+         cell.getValueLength() - Bytes.SIZEOF_INT);
+   }
+ 
+   /**
+    * Gets the table name used in the table lock.
+    * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
+    * @param tn The table name.
+    * @return The table name used in table lock.
+    */
+   public static TableName getTableLockName(TableName tn) {
+     byte[] tableName = tn.getName();
+     return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
+   }
+ 
+   /**
+    * Performs the mob compaction.
+    * @param conf the Configuration
+    * @param fs the file system
+    * @param tableName the table the compact
+    * @param hcd the column descriptor
+    * @param pool the thread pool
+    * @param tableLockManager the tableLock manager
+    * @param allFiles Whether add all mob files into the compaction.
+    */
+   public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
+     HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager,
+     boolean allFiles) throws IOException {
+     String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
+       PartitionedMobCompactor.class.getName());
+     // instantiate the mob compactor.
+     MobCompactor compactor = null;
+     try {
+       compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
+         Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
+         ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool });
+     } catch (Exception e) {
+       throw new IOException("Unable to load configured mob file compactor '" + className + "'", e);
+     }
+     // compact only for mob-enabled column.
+     // obtain a write table lock before performing compaction to avoid race condition
+     // with major compaction in mob-enabled column.
+     boolean tableLocked = false;
+     TableLock lock = null;
+     try {
+       // the tableLockManager might be null in testing. In that case, it is lock-free.
+       if (tableLockManager != null) {
+         lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName),
+           "Run MobCompactor");
+         lock.acquire();
+       }
+       tableLocked = true;
+       compactor.compact(allFiles);
+     } catch (Exception e) {
+       LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
+         + " in the table " + tableName.getNameAsString(), e);
+     } finally {
+       if (lock != null && tableLocked) {
+         try {
+           lock.release();
+         } catch (IOException e) {
+           LOG.error(
+             "Failed to release the write lock for the table " + tableName.getNameAsString(), e);
+         }
+       }
+     }
+   }
+ 
+   /**
+    * Creates a thread pool.
+    * @param conf the Configuration
+    * @return A thread pool.
+    */
+   public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
+     int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
+       MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
+     if (maxThreads == 0) {
+       maxThreads = 1;
+     }
+     final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
+       Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
+         @Override
+         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+           try {
+             // waiting for a thread to pick up instead of throwing exceptions.
+             queue.put(r);
+           } catch (InterruptedException e) {
+             throw new RejectedExecutionException(e);
+           }
+         }
+       });
+     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+     return pool;
+   }
+ 
+   /**
+    * Creates the encyption context.
+    * @param conf The current configuration.
+    * @param family The current column descriptor.
+    * @return The encryption context.
+    * @throws IOException
+    */
+   public static Encryption.Context createEncryptionContext(Configuration conf,
+     HColumnDescriptor family) throws IOException {
+     // TODO the code is repeated, and needs to be unified.
+     Encryption.Context cryptoContext = Encryption.Context.NONE;
+     String cipherName = family.getEncryptionType();
+     if (cipherName != null) {
+       Cipher cipher;
+       Key key;
+       byte[] keyBytes = family.getEncryptionKey();
+       if (keyBytes != null) {
+         // Family provides specific key material
+         String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User
+           .getCurrent().getShortName());
+         try {
+           // First try the master key
+           key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
+         } catch (KeyException e) {
+           // If the current master key fails to unwrap, try the alternate, if
+           // one is configured
+           if (LOG.isDebugEnabled()) {
+             LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
+           }
+           String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
+           if (alternateKeyName != null) {
+             try {
+               key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
+             } catch (KeyException ex) {
+               throw new IOException(ex);
+             }
+           } else {
+             throw new IOException(e);
+           }
+         }
+         // Use the algorithm the key wants
+         cipher = Encryption.getCipher(conf, key.getAlgorithm());
+         if (cipher == null) {
+           throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
+         }
+         // Fail if misconfigured
+         // We use the encryption type specified in the column schema as a sanity check on
+         // what the wrapped key is telling us
+         if (!cipher.getName().equalsIgnoreCase(cipherName)) {
+           throw new RuntimeException("Encryption for family '" + family.getNameAsString()
+             + "' configured with type '" + cipherName + "' but key specifies algorithm '"
+             + cipher.getName() + "'");
+         }
+       } else {
+         // Family does not provide key material, create a random key
+         cipher = Encryption.getCipher(conf, cipherName);
+         if (cipher == null) {
+           throw new RuntimeException("Cipher '" + cipherName + "' is not available");
+         }
+         key = cipher.getRandomKey();
+       }
+       cryptoContext = Encryption.newContext(conf);
+       cryptoContext.setCipher(cipher);
+       cryptoContext.setKey(key);
+     }
+     return cryptoContext;
+   }
+ 
+   /**
+    * Checks whether this table has mob-enabled columns.
+    * @param htd The current table descriptor.
+    * @return Whether this table has mob-enabled columns.
+    */
+   public static boolean hasMobColumns(HTableDescriptor htd) {
+     HColumnDescriptor[] hcds = htd.getColumnFamilies();
+     for (HColumnDescriptor hcd : hcds) {
+       if (hcd.isMobEnabled()) {
+         return true;
+       }
+     }
+     return false;
+   }
+ 
+   /**
+    * Indicates whether return null value when the mob file is missing or corrupt.
+    * The information is set in the attribute "empty.value.on.mobcell.miss" of scan.
+    * @param scan The current scan.
+    * @return True if the readEmptyValueOnMobCellMiss is enabled.
+    */
+   public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) {
 -    byte[] readEmptyValueOnMobCellMiss = scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS);
++    byte[] readEmptyValueOnMobCellMiss =
++      scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS);
+     try {
+       return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss);
+     } catch (IllegalArgumentException e) {
+       return false;
+     }
+   }
 -  
++
+   /**
 -   * Archive mob store files
++   * Archives mob store files
+    * @param conf The current configuration.
+    * @param fs The current file system.
+    * @param mobRegionInfo The mob family region info.
+    * @param mobFamilyDir The mob family directory.
+    * @param family The name of the column family.
+    * @throws IOException
+    */
+   public static void archiveMobStoreFiles(Configuration conf, FileSystem fs,
+       HRegionInfo mobRegionInfo, Path mobFamilyDir, byte[] family) throws IOException {
+     // disable the block cache.
+     Configuration copyOfConf = HBaseConfiguration.create(conf);
+     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+     CacheConfig cacheConfig = new CacheConfig(copyOfConf);
+     FileStatus[] fileStatus = FSUtils.listStatus(fs, mobFamilyDir);
+     List<StoreFile> storeFileList = new ArrayList<StoreFile>();
+     for (FileStatus file : fileStatus) {
+       storeFileList.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
+     }
+     HFileArchiver.archiveStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, family, storeFileList);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java
index 0000000,5d162b4..08865ee
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactionRequest.java
@@@ -1,0 -1,64 +1,64 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.compactions;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ 
+ /**
+  * The compaction request for mob files.
+  */
+ @InterfaceAudience.Private
+ public abstract class MobCompactionRequest {
+ 
+   protected long selectionTime;
+   protected CompactionType type = CompactionType.PART_FILES;
+ 
+   public void setCompactionType(CompactionType type) {
+     this.type = type;
+   }
+ 
+   /**
+    * Gets the selection time.
+    * @return The selection time.
+    */
+   public long getSelectionTime() {
+     return this.selectionTime;
+   }
+ 
+   /**
+    * Gets the compaction type.
+    * @return The compaction type.
+    */
+   public CompactionType getCompactionType() {
+     return type;
+   }
+ 
+   protected enum CompactionType {
+ 
+     /**
+      * Part of mob files are selected.
+      */
+     PART_FILES,
+ 
+     /**
+      * All of mob files are selected.
+      */
+     ALL_FILES;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java
index 0000000,156c6f6..77de0cd
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/MobCompactor.java
@@@ -1,0 -1,90 +1,90 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.compactions;
+ 
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.List;
+ import java.util.concurrent.ExecutorService;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.TableName;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.util.FSUtils;
+ 
+ /**
+  * A mob compactor to directly compact the mob files.
+  */
+ @InterfaceAudience.Private
+ public abstract class MobCompactor {
+ 
+   protected FileSystem fs;
+   protected Configuration conf;
+   protected TableName tableName;
+   protected HColumnDescriptor column;
+ 
+   protected Path mobTableDir;
+   protected Path mobFamilyDir;
+   protected ExecutorService pool;
+ 
+   public MobCompactor(Configuration conf, FileSystem fs, TableName tableName,
+     HColumnDescriptor column, ExecutorService pool) {
+     this.conf = conf;
+     this.fs = fs;
+     this.tableName = tableName;
+     this.column = column;
+     this.pool = pool;
+     mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+     mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString());
+   }
+ 
+   /**
+    * Compacts the mob files for the current column family.
+    * @return The paths of new mob files generated in the compaction.
+    * @throws IOException
+    */
+   public List<Path> compact() throws IOException {
+     return compact(false);
+   }
+ 
+   /**
+    * Compacts the mob files by compaction type for the current column family.
+    * @param allFiles Whether add all mob files into the compaction.
+    * @return The paths of new mob files generated in the compaction.
+    * @throws IOException
+    */
+   public List<Path> compact(boolean allFiles) throws IOException {
+     return compact(Arrays.asList(fs.listStatus(mobFamilyDir)), allFiles);
+   }
+ 
+   /**
+    * Compacts the candidate mob files.
+    * @param files The candidate mob files.
+    * @param allFiles Whether add all mob files into the compaction.
+    * @return The paths of new mob files generated in the compaction.
+    * @throws IOException
+    */
+   public abstract List<Path> compact(List<FileStatus> files, boolean allFiles)
+     throws IOException;
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
index 0000000,af1eb4a..227f1e4
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
@@@ -1,0 -1,146 +1,146 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.compactions;
+ 
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.List;
+ 
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+ 
+ /**
+  * An implementation of {@link MobCompactionRequest} that is used in
+  * {@link PartitionedMobCompactor}.
+  * The mob files that have the same start key and date in their names belong to
+  * the same partition.
+  */
+ @InterfaceAudience.Private
+ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
+ 
+   protected Collection<FileStatus> delFiles;
+   protected Collection<CompactionPartition> compactionPartitions;
+ 
+   public PartitionedMobCompactionRequest(Collection<CompactionPartition> compactionPartitions,
+     Collection<FileStatus> delFiles) {
+     this.selectionTime = EnvironmentEdgeManager.currentTime();
+     this.compactionPartitions = compactionPartitions;
+     this.delFiles = delFiles;
+   }
+ 
+   /**
+    * Gets the compaction partitions.
+    * @return The compaction partitions.
+    */
+   public Collection<CompactionPartition> getCompactionPartitions() {
+     return this.compactionPartitions;
+   }
+ 
+   /**
+    * Gets the del files.
+    * @return The del files.
+    */
+   public Collection<FileStatus> getDelFiles() {
+     return this.delFiles;
+   }
+ 
+   /**
+    * The partition in the mob compaction.
+    * The mob files that have the same start key and date in their names belong to
+    * the same partition.
+    */
+   protected static class CompactionPartition {
+     private List<FileStatus> files = new ArrayList<FileStatus>();
+     private CompactionPartitionId partitionId;
+ 
+     public CompactionPartition(CompactionPartitionId partitionId) {
+       this.partitionId = partitionId;
+     }
+ 
+     public CompactionPartitionId getPartitionId() {
+       return this.partitionId;
+     }
+ 
+     public void addFile(FileStatus file) {
+       files.add(file);
+     }
+ 
+     public List<FileStatus> listFiles() {
+       return Collections.unmodifiableList(files);
+     }
+   }
+ 
+   /**
+    * The partition id that consists of start key and date of the mob file name.
+    */
+   public static class CompactionPartitionId {
+ 
+     private String startKey;
+     private String date;
+ 
+     public CompactionPartitionId(String startKey, String date) {
+       if (startKey == null || date == null) {
+         throw new IllegalArgumentException("Neither of start key and date could be null");
+       }
+       this.startKey = startKey;
+       this.date = date;
+     }
+ 
+     public String getStartKey() {
+       return this.startKey;
+     }
+ 
+     public String getDate() {
+       return this.date;
+     }
+ 
+     @Override
+     public int hashCode() {
+       int result = 17;
+       result = 31 * result + startKey.hashCode();
+       result = 31 * result + date.hashCode();
+       return result;
+     }
+ 
+     @Override
+     public boolean equals(Object obj) {
+       if (this == obj) {
+         return true;
+       }
+       if (!(obj instanceof CompactionPartitionId)) {
+         return false;
+       }
+       CompactionPartitionId another = (CompactionPartitionId) obj;
+       if (!this.startKey.equals(another.startKey)) {
+         return false;
+       }
+       if (!this.date.equals(another.date)) {
+         return false;
+       }
+       return true;
+     }
+ 
+     @Override
+     public String toString() {
+       return new StringBuilder(startKey).append(date).toString();
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 0000000,6c2ff01..19137c4
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@@ -1,0 -1,679 +1,679 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.compactions;
+ 
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Future;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.CellComparator;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.TableName;
+ import org.apache.hadoop.hbase.Tag;
+ import org.apache.hadoop.hbase.TagType;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.Connection;
+ import org.apache.hadoop.hbase.client.ConnectionFactory;
+ import org.apache.hadoop.hbase.client.HTable;
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.hadoop.hbase.client.Table;
+ import org.apache.hadoop.hbase.io.HFileLink;
+ import org.apache.hadoop.hbase.io.crypto.Encryption;
+ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobFileName;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
+ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
+ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
+ import org.apache.hadoop.hbase.regionserver.BloomType;
+ import org.apache.hadoop.hbase.regionserver.HStore;
+ import org.apache.hadoop.hbase.regionserver.ScanInfo;
+ import org.apache.hadoop.hbase.regionserver.ScanType;
+ import org.apache.hadoop.hbase.regionserver.ScannerContext;
+ import org.apache.hadoop.hbase.regionserver.StoreFile;
+ import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+ import org.apache.hadoop.hbase.regionserver.StoreScanner;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.hbase.util.Pair;
+ 
+ /**
+  * An implementation of {@link MobCompactor} that compacts the mob files in partitions.
+  */
+ @InterfaceAudience.Private
+ public class PartitionedMobCompactor extends MobCompactor {
+ 
+   private static final Log LOG = LogFactory.getLog(PartitionedMobCompactor.class);
+   protected long mergeableSize;
+   protected int delFileMaxCount;
+   /** The number of files compacted in a batch */
+   protected int compactionBatchSize;
+   protected int compactionKVMax;
+ 
+   private Path tempPath;
+   private Path bulkloadPath;
+   private CacheConfig compactionCacheConfig;
+   private Tag tableNameTag;
+   private Encryption.Context cryptoContext = Encryption.Context.NONE;
+ 
+   public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
+     HColumnDescriptor column, ExecutorService pool) throws IOException {
+     super(conf, fs, tableName, column, pool);
+     mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
+       MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
+     delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
+       MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
+     // default is 100
+     compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
+       MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
+     tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
+     bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
+       tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
+     compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
+       HConstants.COMPACTION_KV_MAX_DEFAULT);
+     Configuration copyOfConf = new Configuration(conf);
+     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+     compactionCacheConfig = new CacheConfig(copyOfConf);
+     tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
+     cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column);
+   }
+ 
+   @Override
+   public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException {
+     if (files == null || files.isEmpty()) {
+       LOG.info("No candidate mob files");
+       return null;
+     }
+     LOG.info("is allFiles: " + allFiles);
+     // find the files to compact.
+     PartitionedMobCompactionRequest request = select(files, allFiles);
+     // compact the files.
+     return performCompaction(request);
+   }
+ 
+   /**
+    * Selects the compacted mob/del files.
+    * Iterates the candidates to find out all the del files and small mob files.
+    * @param candidates All the candidates.
+    * @param allFiles Whether add all mob files into the compaction.
+    * @return A compaction request.
+    * @throws IOException
+    */
+   protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
+     boolean allFiles) throws IOException {
+     Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
+     Map<CompactionPartitionId, CompactionPartition> filesToCompact =
+       new HashMap<CompactionPartitionId, CompactionPartition>();
+     int selectedFileCount = 0;
+     int irrelevantFileCount = 0;
+     for (FileStatus file : candidates) {
+       if (!file.isFile()) {
+         irrelevantFileCount++;
+         continue;
+       }
+       // group the del files and small files.
+       FileStatus linkedFile = file;
+       if (HFileLink.isHFileLink(file.getPath())) {
+         HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
+         linkedFile = getLinkedFileStatus(link);
+         if (linkedFile == null) {
+           // If the linked file cannot be found, regard it as an irrelevantFileCount file
+           irrelevantFileCount++;
+           continue;
+         }
+       }
+       if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
+         allDelFiles.add(file);
+       } else if (allFiles || linkedFile.getLen() < mergeableSize) {
+         // add all files if allFiles is true,
+         // otherwise add the small files to the merge pool
+         MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
+         CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
+           fileName.getDate());
+         CompactionPartition compactionPartition = filesToCompact.get(id);
+         if (compactionPartition == null) {
+           compactionPartition = new CompactionPartition(id);
+           compactionPartition.addFile(file);
+           filesToCompact.put(id, compactionPartition);
+         } else {
+           compactionPartition.addFile(file);
+         }
+         selectedFileCount++;
+       }
+     }
+     PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
+       filesToCompact.values(), allDelFiles);
+     if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
+       // all the files are selected
+       request.setCompactionType(CompactionType.ALL_FILES);
+     }
+     LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
+       + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
+       + irrelevantFileCount + " irrelevant files");
+     return request;
+   }
+ 
+   /**
+    * Performs the compaction on the selected files.
+    * <ol>
+    * <li>Compacts the del files.</li>
+    * <li>Compacts the selected small mob files and all the del files.</li>
+    * <li>If all the candidates are selected, delete the del files.</li>
+    * </ol>
+    * @param request The compaction request.
+    * @return The paths of new mob files generated in the compaction.
+    * @throws IOException
+    */
+   protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
+     throws IOException {
+     // merge the del files
+     List<Path> delFilePaths = new ArrayList<Path>();
+     for (FileStatus delFile : request.delFiles) {
+       delFilePaths.add(delFile.getPath());
+     }
+     List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
+     List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
+     List<Path> paths = null;
+     try {
+       for (Path newDelPath : newDelPaths) {
+         StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
+         // pre-create reader of a del file to avoid race condition when opening the reader in each
+         // partition.
+         sf.createReader();
+         newDelFiles.add(sf);
+       }
+       LOG.info("After merging, there are " + newDelFiles.size() + " del files");
+       // compact the mob files by partitions.
+       paths = compactMobFiles(request, newDelFiles);
+       LOG.info("After compaction, there are " + paths.size() + " mob files");
+     } finally {
+       closeStoreFileReaders(newDelFiles);
+     }
+     // archive the del files if all the mob files are selected.
+     if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
+       LOG.info("After a mob compaction with all files selected, archiving the del files "
+         + newDelPaths);
+       try {
+         MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
+       } catch (IOException e) {
+         LOG.error("Failed to archive the del files " + newDelPaths, e);
+       }
+     }
+     return paths;
+   }
+ 
+   /**
+    * Compacts the selected small mob files and all the del files.
+    * @param request The compaction request.
+    * @param delFiles The del files.
+    * @return The paths of new mob files after compactions.
+    * @throws IOException
+    */
+   protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request,
+     final List<StoreFile> delFiles) throws IOException {
+     Collection<CompactionPartition> partitions = request.compactionPartitions;
+     if (partitions == null || partitions.isEmpty()) {
+       LOG.info("No partitions of mob files");
+       return Collections.emptyList();
+     }
+     List<Path> paths = new ArrayList<Path>();
+     Connection c = ConnectionFactory.createConnection(conf);
+     final Table table = c.getTable(tableName);
+     try {
+       Map<CompactionPartitionId, Future<List<Path>>> results =
+         new HashMap<CompactionPartitionId, Future<List<Path>>>();
+       // compact the mob files by partitions in parallel.
+       for (final CompactionPartition partition : partitions) {
+         results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
+           @Override
+           public List<Path> call() throws Exception {
+             LOG.info("Compacting mob files for partition " + partition.getPartitionId());
+             return compactMobFilePartition(request, partition, delFiles, table);
+           }
+         }));
+       }
+       // compact the partitions in parallel.
+       List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>();
+       for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
+         try {
+           paths.addAll(result.getValue().get());
+         } catch (Exception e) {
+           // just log the error
+           LOG.error("Failed to compact the partition " + result.getKey(), e);
+           failedPartitions.add(result.getKey());
+         }
+       }
+       if (!failedPartitions.isEmpty()) {
+         // if any partition fails in the compaction, directly throw an exception.
+         throw new IOException("Failed to compact the partitions " + failedPartitions);
+       }
+     } finally {
+       try {
+         table.close();
+       } catch (IOException e) {
+         LOG.error("Failed to close the HTable", e);
+       }
+     }
+     return paths;
+   }
+ 
+   /**
+    * Compacts a partition of selected small mob files and all the del files.
+    * @param request The compaction request.
+    * @param partition A compaction partition.
+    * @param delFiles The del files.
+    * @param table The current table.
+    * @return The paths of new mob files after compactions.
+    * @throws IOException
+    */
+   private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
+     CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
+     List<Path> newFiles = new ArrayList<Path>();
+     List<FileStatus> files = partition.listFiles();
+     int offset = 0;
+     Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
+     Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
+     while (offset < files.size()) {
+       int batch = compactionBatchSize;
+       if (files.size() - offset < compactionBatchSize) {
+         batch = files.size() - offset;
+       }
+       if (batch == 1 && delFiles.isEmpty()) {
+         // only one file left and no del files, do not compact it,
+         // and directly add it to the new files.
+         newFiles.add(files.get(offset).getPath());
+         offset++;
+         continue;
+       }
+       // clean the bulkload directory to avoid loading old files.
+       fs.delete(bulkloadPathOfPartition, true);
+       // add the selected mob files and del files into filesToCompact
+       List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
+       for (int i = offset; i < batch + offset; i++) {
+         StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
+           BloomType.NONE);
+         filesToCompact.add(sf);
+       }
+       filesToCompact.addAll(delFiles);
+       // compact the mob files in a batch.
+       compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
+         bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
+       // move to the next batch.
+       offset += batch;
+     }
+     LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
+       + " to " + newFiles.size());
+     return newFiles;
+   }
+ 
+   /**
+    * Closes the readers of store files.
+    * @param storeFiles The store files to be closed.
+    */
+   private void closeStoreFileReaders(List<StoreFile> storeFiles) {
+     for (StoreFile storeFile : storeFiles) {
+       try {
+         storeFile.closeReader(true);
+       } catch (IOException e) {
+         LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
+       }
+     }
+   }
+ 
+   /**
+    * Compacts a partition of selected small mob files and all the del files in a batch.
+    * @param request The compaction request.
+    * @param partition A compaction partition.
+    * @param table The current table.
+    * @param filesToCompact The files to be compacted.
+    * @param batch The number of mob files to be compacted in a batch.
+    * @param bulkloadPathOfPartition The directory where the bulkload column of the current
+    *        partition is saved.
+    * @param bulkloadColumnPath The directory where the bulkload files of current partition
+    *        are saved.
+    * @param newFiles The paths of new mob files after compactions.
+    * @throws IOException
+    */
+   private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
+     CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
+     Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
+     throws IOException {
+     // open scanner to the selected mob files and del files.
+     StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
+     // the mob files to be compacted, not include the del files.
+     List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
+     // Pair(maxSeqId, cellsCount)
+     Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
+     // open writers for the mob files and new ref store files.
+     Writer writer = null;
+     Writer refFileWriter = null;
+     Path filePath = null;
+     Path refFilePath = null;
+     long mobCells = 0;
+     try {
+       writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
+         tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
+           .getStartKey(), compactionCacheConfig, cryptoContext);
+       filePath = writer.getPath();
+       byte[] fileName = Bytes.toBytes(filePath.getName());
+       // create a temp file and open a writer for it in the bulkloadPath
+       refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
+         .getSecond().longValue(), compactionCacheConfig, cryptoContext);
+       refFilePath = refFileWriter.getPath();
+       List<Cell> cells = new ArrayList<Cell>();
+       boolean hasMore = false;
+       ScannerContext scannerContext =
+               ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+       do {
+         hasMore = scanner.next(cells, scannerContext);
+         for (Cell cell : cells) {
+           // write the mob cell to the mob file.
+           writer.append(cell);
+           // write the new reference cell to the store file.
+           KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
+           refFileWriter.append(reference);
+           mobCells++;
+         }
+         cells.clear();
+       } while (hasMore);
+     } finally {
+       // close the scanner.
+       scanner.close();
+       // append metadata to the mob file, and close the mob file writer.
+       closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
+       // append metadata and bulkload info to the ref mob file, and close the writer.
+       closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
+     }
+     if (mobCells > 0) {
+       // commit mob file
+       MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+       // bulkload the ref file
+       bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
+       newFiles.add(new Path(mobFamilyDir, filePath.getName()));
+     } else {
+       // remove the new files
+       // the mob file is empty, delete it instead of committing.
+       deletePath(filePath);
+       // the ref file is empty, delete it instead of committing.
+       deletePath(refFilePath);
+     }
+     // archive the old mob files, do not archive the del files.
+     try {
+       closeStoreFileReaders(mobFilesToCompact);
+       MobUtils
+         .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
+     } catch (IOException e) {
+       LOG.error("Failed to archive the files " + mobFilesToCompact, e);
+     }
+   }
+ 
+   /**
+    * Compacts the del files in batches which avoids opening too many files.
+    * @param request The compaction request.
+    * @param delFilePaths
+    * @return The paths of new del files after merging or the original files if no merging
+    *         is necessary.
+    * @throws IOException
+    */
+   protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
+     List<Path> delFilePaths) throws IOException {
+     if (delFilePaths.size() <= delFileMaxCount) {
+       return delFilePaths;
+     }
+     // when there are more del files than the number that is allowed, merge it firstly.
+     int offset = 0;
+     List<Path> paths = new ArrayList<Path>();
+     while (offset < delFilePaths.size()) {
+       // get the batch
+       int batch = compactionBatchSize;
+       if (delFilePaths.size() - offset < compactionBatchSize) {
+         batch = delFilePaths.size() - offset;
+       }
+       List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
+       if (batch == 1) {
+         // only one file left, do not compact it, directly add it to the new files.
+         paths.add(delFilePaths.get(offset));
+         offset++;
+         continue;
+       }
+       for (int i = offset; i < batch + offset; i++) {
+         batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
+           BloomType.NONE));
+       }
+       // compact the del files in a batch.
+       paths.add(compactDelFilesInBatch(request, batchedDelFiles));
+       // move to the next batch.
+       offset += batch;
+     }
+     return compactDelFiles(request, paths);
+   }
+ 
+   /**
+    * Compacts the del file in a batch.
+    * @param request The compaction request.
+    * @param delFiles The del files.
+    * @return The path of new del file after merging.
+    * @throws IOException
+    */
+   private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
+     List<StoreFile> delFiles) throws IOException {
+     // create a scanner for the del files.
+     StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
+     Writer writer = null;
+     Path filePath = null;
+     try {
+       writer = MobUtils.createDelFileWriter(conf, fs, column,
+         MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
+         column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
+         cryptoContext);
+       filePath = writer.getPath();
+       List<Cell> cells = new ArrayList<Cell>();
+       boolean hasMore = false;
+       ScannerContext scannerContext =
+               ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+       do {
+         hasMore = scanner.next(cells, scannerContext);
+         for (Cell cell : cells) {
+           writer.append(cell);
+         }
+         cells.clear();
+       } while (hasMore);
+     } finally {
+       scanner.close();
+       if (writer != null) {
+         try {
+           writer.close();
+         } catch (IOException e) {
+           LOG.error("Failed to close the writer of the file " + filePath, e);
+         }
+       }
+     }
+     // commit the new del file
+     Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+     // archive the old del files
+     try {
+       MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
+     } catch (IOException e) {
+       LOG.error("Failed to archive the old del files " + delFiles, e);
+     }
+     return path;
+   }
+ 
+   /**
+    * Creates a store scanner.
+    * @param filesToCompact The files to be compacted.
+    * @param scanType The scan type.
+    * @return The store scanner.
+    * @throws IOException
+    */
+   private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
+     throws IOException {
+     List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
+       null, HConstants.LATEST_TIMESTAMP);
+     Scan scan = new Scan();
+     scan.setMaxVersions(column.getMaxVersions());
+     long ttl = HStore.determineTTLFromFamily(column);
+     ScanInfo scanInfo = new ScanInfo(column, ttl, 0, CellComparator.COMPARATOR);
+     StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
+       HConstants.LATEST_TIMESTAMP);
+     return scanner;
+   }
+ 
+   /**
+    * Bulkloads the current file.
+    * @param table The current table.
+    * @param bulkloadDirectory The path of bulkload directory.
+    * @param fileName The current file name.
+    * @throws IOException
+    */
+   private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName)
+     throws IOException {
+     // bulkload the ref file
+     try {
+       LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+       bulkload.doBulkLoad(bulkloadDirectory, (HTable)table);
+     } catch (Exception e) {
+       // delete the committed mob file
+       deletePath(new Path(mobFamilyDir, fileName));
+       throw new IOException(e);
+     } finally {
+       // delete the bulkload files in bulkloadPath
+       deletePath(bulkloadDirectory);
+     }
+   }
+ 
+   /**
+    * Closes the mob file writer.
+    * @param writer The mob file writer.
+    * @param maxSeqId Maximum sequence id.
+    * @param mobCellsCount The number of mob cells.
+    * @throws IOException
+    */
+   private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
+     throws IOException {
+     if (writer != null) {
+       writer.appendMetadata(maxSeqId, false, mobCellsCount);
+       try {
+         writer.close();
+       } catch (IOException e) {
+         LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
+       }
+     }
+   }
+ 
+   /**
+    * Closes the ref file writer.
+    * @param writer The ref file writer.
+    * @param maxSeqId Maximum sequence id.
+    * @param bulkloadTime The timestamp at which the bulk load file is created.
+    * @throws IOException
+    */
+   private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
+     throws IOException {
+     if (writer != null) {
+       writer.appendMetadata(maxSeqId, false);
+       writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
+       writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
+       try {
+         writer.close();
+       } catch (IOException e) {
+         LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
+       }
+     }
+   }
+ 
+   /**
+    * Gets the max seqId and number of cells of the store files.
+    * @param storeFiles The store files.
+    * @return The pair of the max seqId and number of cells of the store files.
+    * @throws IOException
+    */
+   private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
+     long maxSeqId = 0;
+     long maxKeyCount = 0;
+     for (StoreFile sf : storeFiles) {
+       // the readers will be closed later after the merge.
+       maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
+       byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
+       if (count != null) {
+         maxKeyCount += Bytes.toLong(count);
+       }
+     }
+     return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
+   }
+ 
+   /**
+    * Deletes a file.
+    * @param path The path of the file to be deleted.
+    */
+   private void deletePath(Path path) {
+     try {
+       if (path != null) {
+         fs.delete(path, true);
+       }
+     } catch (IOException e) {
+       LOG.error("Failed to delete the file " + path, e);
+     }
+   }
+ 
+   private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
+     Path[] locations = link.getLocations();
+     for (Path location : locations) {
+       FileStatus file = getFileStatus(location);
+       if (file != null) {
+         return file;
+       }
+     }
+     return null;
+   }
+ 
+   private FileStatus getFileStatus(Path path) throws IOException {
+     try {
+       if (path != null) {
+         FileStatus file = fs.getFileStatus(path);
+         return file;
+       }
+     } catch (FileNotFoundException e) {
+       LOG.warn("The file " + path + " can not be found", e);
+     }
+     return null;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/493f36c8/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
index 0000000,82d03cd..fdda1de
mode 000000,100644..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@@ -1,0 -1,185 +1,185 @@@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hbase.mob.mapreduce;
+ 
+ import java.io.IOException;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
 -import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.HColumnDescriptor;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.KeyValue;
+ import org.apache.hadoop.hbase.KeyValueUtil;
+ import org.apache.hadoop.hbase.Tag;
+ import org.apache.hadoop.hbase.TagType;
++import org.apache.hadoop.hbase.classification.InterfaceAudience;
+ import org.apache.hadoop.hbase.client.BufferedMutator;
+ import org.apache.hadoop.hbase.client.Put;
+ import org.apache.hadoop.hbase.io.crypto.Encryption;
+ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+ import org.apache.hadoop.hbase.mob.MobConstants;
+ import org.apache.hadoop.hbase.mob.MobUtils;
+ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
+ import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
+ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+ import org.apache.hadoop.hbase.regionserver.MemStore;
+ import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+ import org.apache.hadoop.hbase.regionserver.StoreFile;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.mapreduce.Reducer.Context;
+ 
+ /**
+  * The wrapper of a DefaultMemStore.
+  * This wrapper is used in the sweep reducer to buffer and sort the cells written from
+  * the invalid and small mob files.
+  * It's flushed when it's full, the mob data are written to the mob files, and their file names
+  * are written back to store files of HBase.
+  * This memStore is used to sort the cells in mob files.
+  * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
+  * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
+  * The cells in the same mob file are ordered, but cells across mob files are not.
+  * So we need this MemStoreWrapper to sort those cells come from different mob files before
+  * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
+  */
+ @InterfaceAudience.Private
+ public class MemStoreWrapper {
+ 
+   private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
+ 
+   private MemStore memstore;
+   private long flushSize;
+   private CompactionPartitionId partitionId;
+   private Context context;
+   private Configuration conf;
+   private BufferedMutator table;
+   private HColumnDescriptor hcd;
+   private Path mobFamilyDir;
+   private FileSystem fs;
+   private CacheConfig cacheConfig;
+   private Encryption.Context cryptoContext = Encryption.Context.NONE;
+ 
+   public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table,
+     HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException {
+     this.memstore = memstore;
+     this.context = context;
+     this.fs = fs;
+     this.table = table;
+     this.hcd = hcd;
+     this.conf = context.getConfiguration();
+     this.cacheConfig = cacheConfig;
+     flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
+         MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
+     mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
+     cryptoContext = MobUtils.createEncryptionContext(conf, hcd);
+   }
+ 
+   public void setPartitionId(CompactionPartitionId partitionId) {
+     this.partitionId = partitionId;
+   }
+ 
+   /**
+    * Flushes the memstore if the size is large enough.
+    * @throws IOException
+    */
+   private void flushMemStoreIfNecessary() throws IOException {
+     if (memstore.heapSize() >= flushSize) {
+       flushMemStore();
+     }
+   }
+ 
+   /**
+    * Flushes the memstore anyway.
+    * @throws IOException
+    */
+   public void flushMemStore() throws IOException {
+     MemStoreSnapshot snapshot = memstore.snapshot();
+     internalFlushCache(snapshot);
+     memstore.clearSnapshot(snapshot.getId());
+   }
+ 
+   /**
+    * Flushes the snapshot of the memstore.
+    * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
+    * @param snapshot The snapshot of the memstore.
+    * @throws IOException
+    */
+   private void internalFlushCache(final MemStoreSnapshot snapshot)
+       throws IOException {
+     if (snapshot.getCellsCount() == 0) {
+       return;
+     }
+     // generate the files into a temp directory.
+     String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
+     StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
+       new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(),
+       partitionId.getStartKey(), cacheConfig, cryptoContext);
+ 
+     String relativePath = mobFileWriter.getPath().getName();
+     LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
+ 
+     byte[] referenceValue = Bytes.toBytes(relativePath);
+     KeyValueScanner scanner = snapshot.getScanner();
+     Cell cell = null;
+     while (null != (cell = scanner.next())) {
+       mobFileWriter.append(cell);
+     }
+     scanner.close();
+     // Write out the log sequence number that corresponds to this output
+     // hfile. The hfile is current up to and including logCacheFlushId.
+     mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
+     mobFileWriter.close();
+ 
+     MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
+     context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
+     // write reference/fileName back to the store files of HBase.
+     scanner = snapshot.getScanner();
+     scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+     cell = null;
+     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName()
+       .toString()));
+     long updatedCount = 0;
+     while (null != (cell = scanner.next())) {
+       KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
+       Put put =
+           new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
+       put.add(reference);
+       table.mutate(put);
+       updatedCount++;
+     }
+     table.flush();
+     context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount);
+     scanner.close();
+   }
+ 
+   /**
+    * Adds a KeyValue into the memstore.
+    * @param kv The KeyValue to be added.
+    * @throws IOException
+    */
+   public void addToMemstore(KeyValue kv) throws IOException {
+     memstore.add(kv);
+     // flush the memstore if it's full.
+     flushMemStoreIfNecessary();
+   }
+ 
+ }


Mime
View raw message