hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject [48/50] [abbrv] hbase git commit: Merge branch 'master' (2/11/15) into hbase-11339
Date Sun, 22 Feb 2015 20:56:21 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index d8b1376,0000000..4e8ccc1
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@@ -1,648 -1,0 +1,648 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.text.ParseException;
 +import java.text.SimpleDateFormat;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Date;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.HRegionInfo;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
 +import org.apache.hadoop.hbase.backup.HFileArchiver;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.io.compress.Compression;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.io.hfile.HFile;
 +import org.apache.hadoop.hbase.io.hfile.HFileContext;
 +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 +import org.apache.hadoop.hbase.regionserver.BloomType;
 +import org.apache.hadoop.hbase.regionserver.HStore;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +
 +/**
 + * The mob utilities
 + */
 +@InterfaceAudience.Private
 +public class MobUtils {
 +
 +  private static final Log LOG = LogFactory.getLog(MobUtils.class);
 +
 +  private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
 +      new ThreadLocal<SimpleDateFormat>() {
 +    @Override
 +    protected SimpleDateFormat initialValue() {
 +      return new SimpleDateFormat("yyyyMMdd");
 +    }
 +  };
 +
 +  /**
 +   * Formats a date to a string.
 +   * @param date The date.
 +   * @return The string format of the date, it's yyyymmdd.
 +   */
 +  public static String formatDate(Date date) {
 +    return LOCAL_FORMAT.get().format(date);
 +  }
 +
 +  /**
 +   * Parses the string to a date.
 +   * @param dateString The string format of a date, it's yyyymmdd.
 +   * @return A date.
 +   * @throws ParseException
 +   */
 +  public static Date parseDate(String dateString) throws ParseException {
 +    return LOCAL_FORMAT.get().parse(dateString);
 +  }
 +
 +  /**
 +   * Whether the current cell is a mob reference cell.
 +   * @param cell The current cell.
 +   * @return True if the cell has a mob reference tag, false if it doesn't.
 +   */
 +  public static boolean isMobReferenceCell(Cell cell) {
 +    if (cell.getTagsLength() > 0) {
 +      Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
 +          TagType.MOB_REFERENCE_TAG_TYPE);
 +      return tag != null;
 +    }
 +    return false;
 +  }
 +
 +  /**
 +   * Gets the table name tag.
 +   * @param cell The current cell.
 +   * @return The table name tag.
 +   */
 +  public static Tag getTableNameTag(Cell cell) {
 +    if (cell.getTagsLength() > 0) {
 +      Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
 +          TagType.MOB_TABLE_NAME_TAG_TYPE);
 +      return tag;
 +    }
 +    return null;
 +  }
 +
 +  /**
 +   * Whether the tag list has a mob reference tag.
 +   * @param tags The tag list.
 +   * @return True if the list has a mob reference tag, false if it doesn't.
 +   */
 +  public static boolean hasMobReferenceTag(List<Tag> tags) {
 +    if (!tags.isEmpty()) {
 +      for (Tag tag : tags) {
 +        if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
 +          return true;
 +        }
 +      }
 +    }
 +    return false;
 +  }
 +
 +  /**
 +   * Indicates whether it's a raw scan.
 +   * The information is set in the attribute "hbase.mob.scan.raw" of scan.
 +   * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
 +   * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
 +   * the mob file.
 +   * @param scan The current scan.
 +   * @return True if it's a raw scan.
 +   */
 +  public static boolean isRawMobScan(Scan scan) {
 +    byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
 +    try {
 +      return raw != null && Bytes.toBoolean(raw);
 +    } catch (IllegalArgumentException e) {
 +      return false;
 +    }
 +  }
 +
 +  /**
 +   * Indicates whether it's a reference only scan.
 +   * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
 +   * If it's a ref only scan, only the cells with ref tag are returned.
 +   * @param scan The current scan.
 +   * @return True if it's a ref only scan.
 +   */
 +  public static boolean isRefOnlyScan(Scan scan) {
 +    byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
 +    try {
 +      return refOnly != null && Bytes.toBoolean(refOnly);
 +    } catch (IllegalArgumentException e) {
 +      return false;
 +    }
 +  }
 +
 +  /**
 +   * Indicates whether the scan contains the information of caching blocks.
 +   * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
 +   * @param scan The current scan.
 +   * @return True when the Scan attribute specifies to cache the MOB blocks.
 +   */
 +  public static boolean isCacheMobBlocks(Scan scan) {
 +    byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
 +    try {
 +      return cache != null && Bytes.toBoolean(cache);
 +    } catch (IllegalArgumentException e) {
 +      return false;
 +    }
 +  }
 +
 +  /**
 +   * Sets the attribute of caching blocks in the scan.
 +   *
 +   * @param scan
 +   *          The current scan.
 +   * @param cacheBlocks
 +   *          True, set the attribute of caching blocks into the scan, the scanner with this scan
 +   *          caches blocks.
 +   *          False, the scanner doesn't cache blocks for this scan.
 +   */
 +  public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
 +    scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
 +  }
 +
 +  /**
 +   * Cleans the expired mob files.
 +   * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
 +   * the minVersions of that column family is 0.
 +   * @param fs The current file system.
 +   * @param conf The current configuration.
 +   * @param tableName The current table name.
 +   * @param columnDescriptor The descriptor of the current column family.
 +   * @param cacheConfig The cacheConfig that disables the block cache.
 +   * @param current The current time.
 +   * @throws IOException
 +   */
 +  public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
 +      HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
 +      throws IOException {
 +    long timeToLive = columnDescriptor.getTimeToLive();
 +    if (Integer.MAX_VALUE == timeToLive) {
 +      // no need to clean, because the TTL is not set.
 +      return;
 +    }
 +
 +    Date expireDate = new Date(current - timeToLive * 1000);
 +    expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
 +    LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
 +
 +    FileStatus[] stats = null;
 +    Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
 +    Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
 +    try {
 +      stats = fs.listStatus(path);
 +    } catch (FileNotFoundException e) {
 +      LOG.warn("Fail to find the mob file " + path, e);
 +    }
 +    if (null == stats) {
 +      // no file found
 +      return;
 +    }
 +    List<StoreFile> filesToClean = new ArrayList<StoreFile>();
 +    int deletedFileCount = 0;
 +    for (FileStatus file : stats) {
 +      String fileName = file.getPath().getName();
 +      try {
 +        MobFileName mobFileName = null;
 +        if (!HFileLink.isHFileLink(file.getPath())) {
 +          mobFileName = MobFileName.create(fileName);
 +        } else {
-           HFileLink hfileLink = new HFileLink(conf, file.getPath());
++          HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
 +          mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
 +        }
 +        Date fileDate = parseDate(mobFileName.getDate());
 +        if (LOG.isDebugEnabled()) {
 +          LOG.debug("Checking file " + fileName);
 +        }
 +        if (fileDate.getTime() < expireDate.getTime()) {
 +          if (LOG.isDebugEnabled()) {
 +            LOG.debug(fileName + " is an expired file");
 +          }
 +          filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
 +        }
 +      } catch (Exception e) {
 +        LOG.error("Cannot parse the fileName " + fileName, e);
 +      }
 +    }
 +    if (!filesToClean.isEmpty()) {
 +      try {
 +        removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
 +            filesToClean);
 +        deletedFileCount = filesToClean.size();
 +      } catch (IOException e) {
 +        LOG.error("Fail to delete the mob files " + filesToClean, e);
 +      }
 +    }
 +    LOG.info(deletedFileCount + " expired mob files are deleted");
 +  }
 +
 +  /**
 +   * Gets the root dir of the mob files.
 +   * It's {HBASE_DIR}/mobdir.
 +   * @param conf The current configuration.
 +   * @return the root dir of the mob file.
 +   */
 +  public static Path getMobHome(Configuration conf) {
 +    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
 +    return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
 +  }
 +
 +  /**
 +   * Gets the qualified root dir of the mob files.
 +   * @param conf The current configuration.
 +   * @return The qualified root dir.
 +   * @throws IOException
 +   */
 +  public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
 +    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
 +    Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
 +    FileSystem fs = mobRootDir.getFileSystem(conf);
 +    return mobRootDir.makeQualified(fs);
 +  }
 +
 +  /**
 +   * Gets the region dir of the mob files.
 +   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
 +   * @param conf The current configuration.
 +   * @param tableName The current table name.
 +   * @return The region dir of the mob files.
 +   */
 +  public static Path getMobRegionPath(Configuration conf, TableName tableName) {
 +    Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
 +    HRegionInfo regionInfo = getMobRegionInfo(tableName);
 +    return new Path(tablePath, regionInfo.getEncodedName());
 +  }
 +
 +  /**
 +   * Gets the family dir of the mob files.
 +   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
 +   * @param conf The current configuration.
 +   * @param tableName The current table name.
 +   * @param familyName The current family name.
 +   * @return The family dir of the mob files.
 +   */
 +  public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
 +    return new Path(getMobRegionPath(conf, tableName), familyName);
 +  }
 +
 +  /**
 +   * Gets the family dir of the mob files.
 +   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
 +   * @param regionPath The path of mob region which is a dummy one.
 +   * @param familyName The current family name.
 +   * @return The family dir of the mob files.
 +   */
 +  public static Path getMobFamilyPath(Path regionPath, String familyName) {
 +    return new Path(regionPath, familyName);
 +  }
 +
 +  /**
 +   * Gets the HRegionInfo of the mob files.
 +   * This is a dummy region. The mob files are not saved in a region in HBase.
 +   * This is only used in mob snapshot. It's internally used only.
 +   * @param tableName
 +   * @return A dummy mob region info.
 +   */
 +  public static HRegionInfo getMobRegionInfo(TableName tableName) {
 +    HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
 +        HConstants.EMPTY_END_ROW, false, 0);
 +    return info;
 +  }
 +
 +  /**
 +   * Gets whether the current HRegionInfo is a mob one.
 +   * @param regionInfo The current HRegionInfo.
 +   * @return If true, the current HRegionInfo is a mob one.
 +   */
 +  public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
 +    return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
 +        .equals(regionInfo.getEncodedName());
 +  }
 +
 +  /**
 +   * Gets the working directory of the mob compaction.
 +   * @param root The root directory of the mob compaction.
 +   * @param jobName The current job name.
 +   * @return The directory of the mob compaction for the current job.
 +   */
 +  public static Path getCompactionWorkingPath(Path root, String jobName) {
 +    return new Path(root, jobName);
 +  }
 +
 +  /**
 +   * Archives the mob files.
 +   * @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param tableName The table name.
 +   * @param tableDir The table directory.
 +   * @param family The name of the column family.
 +   * @param storeFiles The files to be deleted.
 +   * @throws IOException
 +   */
 +  public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
 +      Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
 +    HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
 +        storeFiles);
 +  }
 +
 +  /**
 +   * Creates a mob reference KeyValue.
 +   * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
 +   * @param cell The original Cell.
 +   * @param fileName The mob file name where the mob reference KeyValue is written.
 +   * @param tableNameTag The tag of the current table name. It's very important in
 +   *                        cloning the snapshot.
 +   * @return The mob reference KeyValue.
 +   */
 +  public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
 +    // Append the tags to the KeyValue.
 +    // The key is same, the value is the filename of the mob file
 +    List<Tag> tags = new ArrayList<Tag>();
 +    // Add the ref tag as the 1st one.
 +    tags.add(MobConstants.MOB_REF_TAG);
 +    // Add the tag of the source table name, this table is where this mob file is flushed
 +    // from.
 +    // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
 +    // find the original mob files by this table name. For details please see cloning
 +    // snapshot for mob files.
 +    tags.add(tableNameTag);
 +    // Add the existing tags.
 +    tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
 +    int valueLength = cell.getValueLength();
 +    byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
 +    KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
 +        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
 +        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
 +        cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
 +    reference.setSequenceId(cell.getSequenceId());
 +    return reference;
 +  }
 +
 +  /**
 +   * Creates a writer for the mob file in temp directory.
 +   * @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param family The descriptor of the current column family.
 +   * @param date The date string, its format is yyyymmmdd.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The hex string of the start key.
 +   * @param cacheConfig The current cache config.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
 +      HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
 +      Compression.Algorithm compression, String startKey, CacheConfig cacheConfig)
 +      throws IOException {
 +    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
 +        .replaceAll("-", ""));
 +    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
 +      cacheConfig);
 +  }
 +
 +  /**
 +   * Creates a writer for the ref file in temp directory.
 +   * @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param family The descriptor of the current column family.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param cacheConfig The current cache config.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
 +    HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig)
 +    throws IOException {
 +    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
 +      .withIncludesTags(true).withCompression(family.getCompactionCompression())
 +      .withCompressTags(family.shouldCompressTags()).withChecksumType(HStore.getChecksumType(conf))
 +      .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
 +      .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
 +    Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
 +    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
 +      .withComparator(KeyValue.COMPARATOR).withBloomType(family.getBloomFilterType())
 +      .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
 +    return w;
 +  }
 +
 +  /**
 +   * Creates a writer for the mob file in temp directory.
 +   * @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param family The descriptor of the current column family.
 +   * @param date The date string, its format is yyyymmmdd.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @param cacheConfig The current cache config.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
 +      HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
 +      throws IOException {
 +    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
 +        .replaceAll("-", ""));
 +    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
 +      cacheConfig);
 +  }
 +
 +  /**
 +   * Creates a writer for the del file in temp directory.
 +   * @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param family The descriptor of the current column family.
 +   * @param date The date string, its format is yyyymmmdd.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param startKey The start key.
 +   * @param cacheConfig The current cache config.
 +   * @return The writer for the del file.
 +   * @throws IOException
 +   */
 +  public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
 +      HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
 +      Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
 +      throws IOException {
 +    String suffix = UUID
 +      .randomUUID().toString().replaceAll("-", "") + "_del";
 +    MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
 +    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
 +      cacheConfig);
 +  }
 +
 +  /**
 +   * Creates a writer for the del file in temp directory.
 +   * @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param family The descriptor of the current column family.
 +   * @param mobFileName The mob file name.
 +   * @param basePath The basic path for a temp directory.
 +   * @param maxKeyCount The key count.
 +   * @param compression The compression algorithm.
 +   * @param cacheConfig The current cache config.
 +   * @return The writer for the mob file.
 +   * @throws IOException
 +   */
 +  private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
 +    HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
 +    Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException {
 +    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
 +      .withIncludesMvcc(false).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
 +      .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize())
 +      .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
 +
 +    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
 +      .withFilePath(new Path(basePath, mobFileName.getFileName()))
 +      .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
 +      .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
 +    return w;
 +  }
 +
 +  /**
 +   * Commits the mob file.
 +   * @param @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param path The path where the mob file is saved.
 +   * @param targetPath The directory path where the source file is renamed to.
 +   * @param cacheConfig The current cache config.
 +   * @return The target file path the source file is renamed to.
 +   * @throws IOException
 +   */
 +  public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
 +      Path targetPath, CacheConfig cacheConfig) throws IOException {
 +    if (sourceFile == null) {
 +      return null;
 +    }
 +    Path dstPath = new Path(targetPath, sourceFile.getName());
 +    validateMobFile(conf, fs, sourceFile, cacheConfig);
 +    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
 +    LOG.info(msg);
 +    Path parent = dstPath.getParent();
 +    if (!fs.exists(parent)) {
 +      fs.mkdirs(parent);
 +    }
 +    if (!fs.rename(sourceFile, dstPath)) {
 +      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
 +    }
 +    return dstPath;
 +  }
 +
 +  /**
 +   * Validates a mob file by opening and closing it.
 +   * @param conf The current configuration.
 +   * @param fs The current file system.
 +   * @param path The path where the mob file is saved.
 +   * @param cacheConfig The current cache config.
 +   */
 +  private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
 +      CacheConfig cacheConfig) throws IOException {
 +    StoreFile storeFile = null;
 +    try {
 +      storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
 +      storeFile.createReader();
 +    } catch (IOException e) {
 +      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
 +      throw e;
 +    } finally {
 +      if (storeFile != null) {
 +        storeFile.closeReader(false);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Indicates whether the current mob ref cell has a valid value.
 +   * A mob ref cell has a mob reference tag.
 +   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
 +   * The real mob value length takes 4 bytes.
 +   * The remaining part is the mob file name.
 +   * @param cell The mob ref cell.
 +   * @return True if the cell has a valid value.
 +   */
 +  public static boolean hasValidMobRefCellValue(Cell cell) {
 +    return cell.getValueLength() > Bytes.SIZEOF_INT;
 +  }
 +
 +  /**
 +   * Gets the mob value length from the mob ref cell.
 +   * A mob ref cell has a mob reference tag.
 +   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
 +   * The real mob value length takes 4 bytes.
 +   * The remaining part is the mob file name.
 +   * @param cell The mob ref cell.
 +   * @return The real mob value length.
 +   */
 +  public static int getMobValueLength(Cell cell) {
 +    return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
 +  }
 +
 +  /**
 +   * Gets the mob file name from the mob ref cell.
 +   * A mob ref cell has a mob reference tag.
 +   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
 +   * The real mob value length takes 4 bytes.
 +   * The remaining part is the mob file name.
 +   * @param cell The mob ref cell.
 +   * @return The mob file name.
 +   */
 +  public static String getMobFileName(Cell cell) {
 +    return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
 +        cell.getValueLength() - Bytes.SIZEOF_INT);
 +  }
 +
 +  /**
 +   * Gets the table name used in the table lock.
 +   * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
 +   * @param tn The table name.
 +   * @return The table name used in table lock.
 +   */
 +  public static TableName getTableLockName(TableName tn) {
 +    byte[] tableName = tn.getName();
 +    return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
index 6cd3172,0000000..d6ad143
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
@@@ -1,631 -1,0 +1,631 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.filecompactions;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HColumnDescriptor;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.Tag;
 +import org.apache.hadoop.hbase.TagType;
 +import org.apache.hadoop.hbase.client.HTable;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.HFileLink;
 +import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 +import org.apache.hadoop.hbase.mob.MobConstants;
 +import org.apache.hadoop.hbase.mob.MobFileName;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
 +import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
 +import org.apache.hadoop.hbase.regionserver.BloomType;
 +import org.apache.hadoop.hbase.regionserver.HStore;
 +import org.apache.hadoop.hbase.regionserver.ScanInfo;
 +import org.apache.hadoop.hbase.regionserver.ScanType;
 +import org.apache.hadoop.hbase.regionserver.StoreFile;
 +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 +import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 +import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 +import org.apache.hadoop.hbase.regionserver.StoreScanner;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.Pair;
 +
 +/**
 + * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions.
 + */
 +@InterfaceAudience.Private
 +public class PartitionedMobFileCompactor extends MobFileCompactor {
 +
 +  private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class);
 +  protected long mergeableSize;
 +  protected int delFileMaxCount;
 +  /** The number of files compacted in a batch */
 +  protected int compactionBatchSize;
 +  protected int compactionKVMax;
 +
 +  private Path tempPath;
 +  private Path bulkloadPath;
 +  private CacheConfig compactionCacheConfig;
 +  private Tag tableNameTag;
 +
 +  public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
 +    HColumnDescriptor column, ExecutorService pool) {
 +    super(conf, fs, tableName, column, pool);
 +    mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
 +    delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
 +      MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
 +    // default is 100
 +    compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
 +      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
 +    tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
 +    bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME,
 +      tableName.getNameAsString()));
 +    compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
 +      HConstants.COMPACTION_KV_MAX_DEFAULT);
 +    Configuration copyOfConf = new Configuration(conf);
 +    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
 +    compactionCacheConfig = new CacheConfig(copyOfConf);
 +    tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
 +  }
 +
 +  @Override
 +  public List<Path> compact(List<FileStatus> files) throws IOException {
 +    if (files == null || files.isEmpty()) {
 +      return null;
 +    }
 +    // find the files to compact.
 +    PartitionedMobFileCompactionRequest request = select(files);
 +    // compact the files.
 +    return performCompaction(request);
 +  }
 +
 +  /**
 +   * Selects the compacted mob/del files.
 +   * Iterates the candidates to find out all the del files and small mob files.
 +   * @param candidates All the candidates.
 +   * @return A compaction request.
 +   * @throws IOException
 +   */
 +  protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates)
 +    throws IOException {
 +    Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
 +    Map<CompactionPartitionId, CompactionPartition> filesToCompact =
 +      new HashMap<CompactionPartitionId, CompactionPartition>();
 +    int selectedFileCount = 0;
 +    int irrelevantFileCount = 0;
 +    for (FileStatus file : candidates) {
 +      if (!file.isFile()) {
 +        irrelevantFileCount++;
 +        continue;
 +      }
 +      // group the del files and small files.
 +      FileStatus linkedFile = file;
 +      if (HFileLink.isHFileLink(file.getPath())) {
-         HFileLink link = new HFileLink(conf, file.getPath());
++        HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
 +        linkedFile = getLinkedFileStatus(link);
 +        if (linkedFile == null) {
 +          // If the linked file cannot be found, regard it as an irrelevantFileCount file
 +          irrelevantFileCount++;
 +          continue;
 +        }
 +      }
 +      if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
 +        allDelFiles.add(file);
 +      } else if (linkedFile.getLen() < mergeableSize) {
 +        // add the small files to the merge pool
 +        MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
 +        CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
 +          fileName.getDate());
 +        CompactionPartition compactionPartition = filesToCompact.get(id);
 +        if (compactionPartition == null) {
 +          compactionPartition = new CompactionPartition(id);
 +          compactionPartition.addFile(file);
 +          filesToCompact.put(id, compactionPartition);
 +        } else {
 +          compactionPartition.addFile(file);
 +        }
 +        selectedFileCount++;
 +      }
 +    }
 +    PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest(
 +      filesToCompact.values(), allDelFiles);
 +    if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
 +      // all the files are selected
 +      request.setCompactionType(CompactionType.ALL_FILES);
 +    }
 +    return request;
 +  }
 +
 +  /**
 +   * Performs the compaction on the selected files.
 +   * <ol>
 +   * <li>Compacts the del files.</li>
 +   * <li>Compacts the selected small mob files and all the del files.</li>
 +   * <li>If all the candidates are selected, delete the del files.</li>
 +   * </ol>
 +   * @param request The compaction request.
 +   * @return The paths of new mob files generated in the compaction.
 +   * @throws IOException
 +   */
 +  protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
 +    throws IOException {
 +    // merge the del files
 +    List<Path> delFilePaths = new ArrayList<Path>();
 +    for (FileStatus delFile : request.delFiles) {
 +      delFilePaths.add(delFile.getPath());
 +    }
 +    List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
 +    List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
 +    for (Path newDelPath : newDelPaths) {
 +      StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
 +      newDelFiles.add(sf);
 +    }
 +    // compact the mob files by partitions.
 +    List<Path> paths = compactMobFiles(request, newDelFiles);
 +    // archive the del files if all the mob files are selected.
 +    if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
 +      try {
 +        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
 +      } catch (IOException e) {
 +        LOG.error("Failed to archive the del files " + newDelFiles, e);
 +      }
 +    }
 +    return paths;
 +  }
 +
 +  /**
 +   * Compacts the selected small mob files and all the del files.
 +   * @param request The compaction request.
 +   * @param delFiles The del files.
 +   * @return The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request,
 +    final List<StoreFile> delFiles) throws IOException {
 +    Collection<CompactionPartition> partitions = request.compactionPartitions;
 +    if (partitions == null || partitions.isEmpty()) {
 +      return Collections.emptyList();
 +    }
 +    List<Path> paths = new ArrayList<Path>();
 +    final HTable table = new HTable(conf, tableName);
 +    try {
 +      Map<CompactionPartitionId, Future<List<Path>>> results =
 +        new HashMap<CompactionPartitionId, Future<List<Path>>>();
 +      // compact the mob files by partitions in parallel.
 +      for (final CompactionPartition partition : partitions) {
 +        results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
 +          @Override
 +          public List<Path> call() throws Exception {
 +            return compactMobFilePartition(request, partition, delFiles, table);
 +          }
 +        }));
 +      }
 +      // compact the partitions in parallel.
 +      boolean hasFailure = false;
 +      for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
 +        try {
 +          paths.addAll(result.getValue().get());
 +        } catch (Exception e) {
 +          // just log the error
 +          LOG.error("Failed to compact the partition " + result.getKey(), e);
 +          hasFailure = true;
 +        }
 +      }
 +      if (hasFailure) {
 +        // if any partition fails in the compaction, directly throw an exception.
 +        throw new IOException("Failed to compact the partitions");
 +      }
 +    } finally {
 +      try {
 +        table.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the HTable", e);
 +      }
 +    }
 +    return paths;
 +  }
 +
 +  /**
 +   * Compacts a partition of selected small mob files and all the del files.
 +   * @param request The compaction request.
 +   * @param partition A compaction partition.
 +   * @param delFiles The del files.
 +   * @param table The current table.
 +   * @return The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request,
 +    CompactionPartition partition, List<StoreFile> delFiles, HTable table) throws IOException {
 +    List<Path> newFiles = new ArrayList<Path>();
 +    List<FileStatus> files = partition.listFiles();
 +    int offset = 0;
 +    Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
 +    Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
 +    while (offset < files.size()) {
 +      int batch = compactionBatchSize;
 +      if (files.size() - offset < compactionBatchSize) {
 +        batch = files.size() - offset;
 +      }
 +      if (batch == 1 && delFiles.isEmpty()) {
 +        // only one file left and no del files, do not compact it,
 +        // and directly add it to the new files.
 +        newFiles.add(files.get(offset).getPath());
 +        offset++;
 +        continue;
 +      }
 +      // clean the bulkload directory to avoid loading old files.
 +      fs.delete(bulkloadPathOfPartition, true);
 +      // add the selected mob files and del files into filesToCompact
 +      List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
 +      for (int i = offset; i < batch + offset; i++) {
 +        StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
 +          BloomType.NONE);
 +        filesToCompact.add(sf);
 +      }
 +      filesToCompact.addAll(delFiles);
 +      // compact the mob files in a batch.
 +      compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
 +        bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
 +      // move to the next batch.
 +      offset += batch;
 +    }
 +    return newFiles;
 +  }
 +
 +  /**
 +   * Compacts a partition of selected small mob files and all the del files in a batch.
 +   * @param request The compaction request.
 +   * @param partition A compaction partition.
 +   * @param table The current table.
 +   * @param filesToCompact The files to be compacted.
 +   * @param batch The number of mob files to be compacted in a batch.
 +   * @param bulkloadPathOfPartition The directory where the bulkload column of the current
 +   *        partition is saved.
 +   * @param bulkloadColumnPath The directory where the bulkload files of current partition
 +   *        are saved.
 +   * @param newFiles The paths of new mob files after compactions.
 +   * @throws IOException
 +   */
 +  private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request,
 +    CompactionPartition partition, HTable table, List<StoreFile> filesToCompact, int batch,
 +    Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
 +    throws IOException {
 +    // open scanner to the selected mob files and del files.
 +    StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
 +    // the mob files to be compacted, not include the del files.
 +    List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
 +    // Pair(maxSeqId, cellsCount)
 +    Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
 +    // open writers for the mob files and new ref store files.
 +    Writer writer = null;
 +    Writer refFileWriter = null;
 +    Path filePath = null;
 +    Path refFilePath = null;
 +    long mobCells = 0;
 +    try {
 +      writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
 +        tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
 +          .getStartKey(), compactionCacheConfig);
 +      filePath = writer.getPath();
 +      byte[] fileName = Bytes.toBytes(filePath.getName());
 +      // create a temp file and open a writer for it in the bulkloadPath
 +      refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
 +        .getSecond().longValue(), compactionCacheConfig);
 +      refFilePath = refFileWriter.getPath();
 +      List<Cell> cells = new ArrayList<Cell>();
 +      boolean hasMore = false;
 +      do {
 +        hasMore = scanner.next(cells, compactionKVMax);
 +        for (Cell cell : cells) {
 +          // TODO remove this after the new code are introduced.
 +          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +          // write the mob cell to the mob file.
 +          writer.append(kv);
 +          // write the new reference cell to the store file.
 +          KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
 +          refFileWriter.append(reference);
 +          mobCells++;
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      // close the scanner.
 +      scanner.close();
 +      // append metadata to the mob file, and close the mob file writer.
 +      closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
 +      // append metadata and bulkload info to the ref mob file, and close the writer.
 +      closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
 +    }
 +    if (mobCells > 0) {
 +      // commit mob file
 +      MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
 +      // bulkload the ref file
 +      bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
 +      newFiles.add(new Path(mobFamilyDir, filePath.getName()));
 +    } else {
 +      // remove the new files
 +      // the mob file is empty, delete it instead of committing.
 +      deletePath(filePath);
 +      // the ref file is empty, delete it instead of committing.
 +      deletePath(refFilePath);
 +    }
 +    // archive the old mob files, do not archive the del files.
 +    try {
 +      MobUtils
 +        .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
 +    } catch (IOException e) {
 +      LOG.error("Failed to archive the files " + mobFilesToCompact, e);
 +    }
 +  }
 +
 +  /**
 +   * Compacts the del files in batches which avoids opening too many files.
 +   * @param request The compaction request.
 +   * @param delFilePaths
 +   * @return The paths of new del files after merging or the original files if no merging
 +   *         is necessary.
 +   * @throws IOException
 +   */
 +  protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request,
 +    List<Path> delFilePaths) throws IOException {
 +    if (delFilePaths.size() <= delFileMaxCount) {
 +      return delFilePaths;
 +    }
 +    // when there are more del files than the number that is allowed, merge it firstly.
 +    int offset = 0;
 +    List<Path> paths = new ArrayList<Path>();
 +    while (offset < delFilePaths.size()) {
 +      // get the batch
 +      int batch = compactionBatchSize;
 +      if (delFilePaths.size() - offset < compactionBatchSize) {
 +        batch = delFilePaths.size() - offset;
 +      }
 +      List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
 +      if (batch == 1) {
 +        // only one file left, do not compact it, directly add it to the new files.
 +        paths.add(delFilePaths.get(offset));
 +        offset++;
 +        continue;
 +      }
 +      for (int i = offset; i < batch + offset; i++) {
 +        batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
 +          BloomType.NONE));
 +      }
 +      // compact the del files in a batch.
 +      paths.add(compactDelFilesInBatch(request, batchedDelFiles));
 +      // move to the next batch.
 +      offset += batch;
 +    }
 +    return compactDelFiles(request, paths);
 +  }
 +
 +  /**
 +   * Compacts the del file in a batch.
 +   * @param request The compaction request.
 +   * @param delFiles The del files.
 +   * @return The path of new del file after merging.
 +   * @throws IOException
 +   */
 +  private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request,
 +    List<StoreFile> delFiles) throws IOException {
 +    // create a scanner for the del files.
 +    StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
 +    Writer writer = null;
 +    Path filePath = null;
 +    try {
 +      writer = MobUtils.createDelFileWriter(conf, fs, column,
 +        MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
 +        column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig);
 +      filePath = writer.getPath();
 +      List<Cell> cells = new ArrayList<Cell>();
 +      boolean hasMore = false;
 +      do {
 +        hasMore = scanner.next(cells, compactionKVMax);
 +        for (Cell cell : cells) {
 +          // TODO remove this after the new code are introduced.
 +          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 +          writer.append(kv);
 +        }
 +        cells.clear();
 +      } while (hasMore);
 +    } finally {
 +      scanner.close();
 +      if (writer != null) {
 +        try {
 +          writer.close();
 +        } catch (IOException e) {
 +          LOG.error("Failed to close the writer of the file " + filePath, e);
 +        }
 +      }
 +    }
 +    // commit the new del file
 +    Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
 +    // archive the old del files
 +    try {
 +      MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
 +    } catch (IOException e) {
 +      LOG.error("Failed to archive the old del files " + delFiles, e);
 +    }
 +    return path;
 +  }
 +
 +  /**
 +   * Creates a store scanner.
 +   * @param filesToCompact The files to be compacted.
 +   * @param scanType The scan type.
 +   * @return The store scanner.
 +   * @throws IOException
 +   */
 +  private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
 +    throws IOException {
 +    List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
 +      null, HConstants.LATEST_TIMESTAMP);
 +    Scan scan = new Scan();
 +    scan.setMaxVersions(column.getMaxVersions());
 +    long ttl = HStore.determineTTLFromFamily(column);
 +    ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR);
 +    StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
 +      HConstants.LATEST_TIMESTAMP);
 +    return scanner;
 +  }
 +
 +  /**
 +   * Bulkloads the current file.
 +   * @param table The current table.
 +   * @param bulkloadDirectory The path of bulkload directory.
 +   * @param fileName The current file name.
 +   * @throws IOException
 +   */
 +  private void bulkloadRefFile(HTable table, Path bulkloadDirectory, String fileName)
 +    throws IOException {
 +    // bulkload the ref file
 +    try {
 +      LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
 +      bulkload.doBulkLoad(bulkloadDirectory, table);
 +    } catch (Exception e) {
 +      // delete the committed mob file
 +      deletePath(new Path(mobFamilyDir, fileName));
 +      throw new IOException(e);
 +    } finally {
 +      // delete the bulkload files in bulkloadPath
 +      deletePath(bulkloadDirectory);
 +    }
 +  }
 +
 +  /**
 +   * Closes the mob file writer.
 +   * @param writer The mob file writer.
 +   * @param maxSeqId Maximum sequence id.
 +   * @param mobCellsCount The number of mob cells.
 +   * @throws IOException
 +   */
 +  private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
 +    throws IOException {
 +    if (writer != null) {
 +      writer.appendMetadata(maxSeqId, false, mobCellsCount);
 +      try {
 +        writer.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Closes the ref file writer.
 +   * @param writer The ref file writer.
 +   * @param maxSeqId Maximum sequence id.
 +   * @param bulkloadTime The timestamp at which the bulk load file is created.
 +   * @throws IOException
 +   */
 +  private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
 +    throws IOException {
 +    if (writer != null) {
 +      writer.appendMetadata(maxSeqId, false);
 +      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
 +      try {
 +        writer.close();
 +      } catch (IOException e) {
 +        LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Gets the max seqId and number of cells of the store files.
 +   * @param storeFiles The store files.
 +   * @return The pair of the max seqId and number of cells of the store files.
 +   * @throws IOException
 +   */
 +  private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
 +    long maxSeqId = 0;
 +    long maxKeyCount = 0;
 +    for (StoreFile sf : storeFiles) {
 +      // the readers will be closed later after the merge.
 +      maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
 +      byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
 +      if (count != null) {
 +        maxKeyCount += Bytes.toLong(count);
 +      }
 +    }
 +    return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
 +  }
 +
 +  /**
 +   * Deletes a file.
 +   * @param path The path of the file to be deleted.
 +   */
 +  private void deletePath(Path path) {
 +    try {
 +      if (path != null) {
 +        fs.delete(path, true);
 +      }
 +    } catch (IOException e) {
 +      LOG.error("Failed to delete the file " + path, e);
 +    }
 +  }
 +
 +  private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
 +    Path[] locations = link.getLocations();
 +    for (Path location : locations) {
 +      FileStatus file = getFileStatus(location);
 +      if (file != null) {
 +        return file;
 +      }
 +    }
 +    return null;
 +  }
 +
 +  private FileStatus getFileStatus(Path path) throws IOException {
 +    try {
 +      if (path != null) {
 +        FileStatus file = fs.getFileStatus(path);
 +        return file;
 +      }
 +    } catch (FileNotFoundException e) {
 +      LOG.warn("The file " + path + " can not be found", e);
 +    }
 +    return null;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
index 56e5726,0000000..559d6db
mode 100644,000000..100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
@@@ -1,85 -1,0 +1,87 @@@
 +/**
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hbase.mob.mapreduce;
 +
 +import java.io.IOException;
 +
 +import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.KeyValue;
++import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 +import org.apache.hadoop.hbase.mapreduce.TableMapper;
 +import org.apache.hadoop.hbase.mob.MobUtils;
 +import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 +import org.apache.hadoop.io.Text;
 +import org.apache.zookeeper.KeeperException;
 +
 +/**
 + * The mapper of a sweep job.
 + * Takes the rows from the table and their results and map to <filename:Text, mobValue:KeyValue>
 + * where mobValue is the actual cell in HBase.
 + */
 +@InterfaceAudience.Private
 +public class SweepMapper extends TableMapper<Text, KeyValue> {
 +
 +  private ZooKeeperWatcher zkw = null;
 +
 +  @Override
 +  protected void setup(Context context) throws IOException,
 +      InterruptedException {
 +    String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
 +    String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
 +    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
 +    zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
 +        new DummyMobAbortable());
 +    try {
 +      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
 +      tracker.start();
 +    } catch (KeeperException e) {
 +      throw new IOException(e);
 +    }
 +  }
 +
 +  @Override
 +  protected void cleanup(Context context) throws IOException,
 +      InterruptedException {
 +    if (zkw != null) {
 +      zkw.close();
 +    }
 +  }
 +
 +  @Override
 +  public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException,
 +      InterruptedException {
 +    if (columns == null) {
 +      return;
 +    }
-     KeyValue[] kvList = columns.raw();
-     if (kvList == null || kvList.length == 0) {
++    Cell[] cells = columns.rawCells();
++    if (cells == null || cells.length == 0) {
 +      return;
 +    }
-     for (KeyValue kv : kvList) {
-       if (MobUtils.hasValidMobRefCellValue(kv)) {
-         String fileName = MobUtils.getMobFileName(kv);
-         context.write(new Text(fileName), kv);
++    for (Cell c : cells) {
++      if (MobUtils.hasValidMobRefCellValue(c)) {
++        String fileName = MobUtils.getMobFileName(c);
++        context.write(new Text(fileName), KeyValueUtil.ensureKeyValue(c));
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD
index 0000000,0000000..6b954ac
new file mode 100644
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD
@@@ -1,0 -1,0 +1,54 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hbase.quotas;
++
++import java.io.IOException;
++
++import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.HRegionInfo;
++
++/**
++ * The listener interface for receiving region state events.
++ */
++@InterfaceAudience.Private
++public interface RegionStateListener {
++
++  /**
++   * Process region split event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionSplit(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region split reverted event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException Signals that an I/O exception has occurred.
++   */
++  void onRegionSplitReverted(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region merge event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionMerged(HRegionInfo hri) throws IOException;
++}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD_0
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD_0
index 0000000,0000000..6b954ac
new file mode 100644
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~HEAD_0
@@@ -1,0 -1,0 +1,54 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hbase.quotas;
++
++import java.io.IOException;
++
++import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.HRegionInfo;
++
++/**
++ * The listener interface for receiving region state events.
++ */
++@InterfaceAudience.Private
++public interface RegionStateListener {
++
++  /**
++   * Process region split event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionSplit(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region split reverted event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException Signals that an I/O exception has occurred.
++   */
++  void onRegionSplitReverted(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region merge event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionMerged(HRegionInfo hri) throws IOException;
++}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~jon_master
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~jon_master
index 0000000,0000000..6b954ac
new file mode 100644
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~jon_master
@@@ -1,0 -1,0 +1,54 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hbase.quotas;
++
++import java.io.IOException;
++
++import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.HRegionInfo;
++
++/**
++ * The listener interface for receiving region state events.
++ */
++@InterfaceAudience.Private
++public interface RegionStateListener {
++
++  /**
++   * Process region split event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionSplit(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region split reverted event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException Signals that an I/O exception has occurred.
++   */
++  void onRegionSplitReverted(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region merge event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionMerged(HRegionInfo hri) throws IOException;
++}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~master
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~master
index 0000000,0000000..6b954ac
new file mode 100644
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java~master
@@@ -1,0 -1,0 +1,54 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hbase.quotas;
++
++import java.io.IOException;
++
++import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.hbase.HRegionInfo;
++
++/**
++ * The listener interface for receiving region state events.
++ */
++@InterfaceAudience.Private
++public interface RegionStateListener {
++
++  /**
++   * Process region split event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionSplit(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region split reverted event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException Signals that an I/O exception has occurred.
++   */
++  void onRegionSplitReverted(HRegionInfo hri) throws IOException;
++
++  /**
++   * Process region merge event.
++   *
++   * @param hri An instance of HRegionInfo
++   * @throws IOException
++   */
++  void onRegionMerged(HRegionInfo hri) throws IOException;
++}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe335b68/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
----------------------------------------------------------------------
diff --cc hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index 4afa80c,3c1345d..d55822d
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@@ -62,29 -64,19 +64,30 @@@ public class DefaultStoreEngine extend
    @Override
    protected void createComponents(
        Configuration conf, Store store, KVComparator kvComparator) throws IOException {
-     storeFileManager = new DefaultStoreFileManager(kvComparator, conf);
 +    createCompactor(conf, store);
 +    createCompactionPolicy(conf, store);
 +    createStoreFlusher(conf, store);
++    storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf());
++
 +  }
 +
 +  protected void createCompactor(Configuration conf, Store store) throws IOException {
      String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName());
      try {
        compactor = ReflectionUtils.instantiateWithCustomCtor(className,
--          new Class[] { Configuration.class, Store.class }, new Object[] { conf, store });
++              new Class[]{Configuration.class, Store.class}, new Object[]{conf, store});
      } catch (Exception e) {
        throw new IOException("Unable to load configured compactor '" + className + "'", e);
      }
 -    className = conf.get(
 -        DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName());
 +  }
 +
 +  protected void createCompactionPolicy(Configuration conf, Store store) throws IOException {
 +    String className = conf.get(
-         DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName());
++            DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName());
      try {
        compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
--          new Class[] { Configuration.class, StoreConfigInformation.class },
--          new Object[] { conf, store });
++              new Class[]{Configuration.class, StoreConfigInformation.class},
++              new Object[]{conf, store});
      } catch (Exception e) {
        throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
      }
@@@ -101,7 -91,7 +104,6 @@@
      }
    }
  
--
    @Override
    public CompactionContext createCompaction() {
      return new DefaultCompactionContext();


Mime
View raw message