hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jingchen...@apache.org
Subject hbase git commit: HBASE-16981 Expand Mob Compaction Partition policy from daily to weekly, monthly
Date Fri, 03 Feb 2017 08:05:50 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 63c819efb -> 115929654


HBASE-16981 Expand Mob Compaction Partition policy from daily to weekly, monthly

Support weekly and monthly mob compact partition policies in addition to the existing
daily partition policy.

Signed-off-by: Jingcheng Du <jingchengdu@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/11592965
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/11592965
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/11592965

Branch: refs/heads/master
Commit: 11592965413224abed4156236d7ccd4699ce7e75
Parents: 63c819e
Author: Huaxiang Sun <hsun@cloudera.com>
Authored: Wed Feb 1 08:20:52 2017 -0800
Committer: Jingcheng Du <jingchengdu@apache.org>
Committed: Fri Feb 3 15:52:43 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HColumnDescriptor.java  |  35 +-
 .../hbase/client/MobCompactPartitionPolicy.java |  42 ++
 .../hadoop/hbase/TestHColumnDescriptor.java     |   4 +
 .../org/apache/hadoop/hbase/mob/MobUtils.java   | 163 +++++++-
 .../PartitionedMobCompactionRequest.java        |  23 ++
 .../compactions/PartitionedMobCompactor.java    |  56 ++-
 .../hbase/mob/compactions/TestMobCompactor.java | 383 ++++++++++++++++++-
 .../TestPartitionedMobCompactor.java            | 186 +++++++--
 hbase-shell/src/main/ruby/hbase/admin.rb        |   9 +
 .../src/main/ruby/shell/commands/create.rb      |   1 +
 10 files changed, 843 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 0d557aa..1597a06 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.HBaseException;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.util.PrettyPrinter.Unit;
 
 import com.google.common.base.Preconditions;
 
-
 /**
  * An HColumnDescriptor contains information about a column family such as the
  * number of versions, compression settings, etc.
@@ -130,6 +130,11 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
   public static final String MOB_THRESHOLD = "MOB_THRESHOLD";
   public static final byte[] MOB_THRESHOLD_BYTES = Bytes.toBytes(MOB_THRESHOLD);
   public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k
+  public static final String MOB_COMPACT_PARTITION_POLICY = "MOB_COMPACT_PARTITION_POLICY";
+  public static final byte[] MOB_COMPACT_PARTITION_POLICY_BYTES =
+      Bytes.toBytes(MOB_COMPACT_PARTITION_POLICY);
+  public static final MobCompactPartitionPolicy DEFAULT_MOB_COMPACT_PARTITION_POLICY =
+      MobCompactPartitionPolicy.DAILY;
 
   public static final String DFS_REPLICATION = "DFS_REPLICATION";
   public static final short DEFAULT_DFS_REPLICATION = 0;
@@ -276,6 +281,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
     RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
     RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES));
     RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES));
+    RESERVED_KEYWORDS.add(new Bytes(MOB_COMPACT_PARTITION_POLICY_BYTES));
   }
 
   private static final int UNINITIALIZED = -1;
@@ -438,8 +444,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
     if (Bytes.compareTo(Bytes.toBytes(HConstants.VERSIONS), key) == 0) {
       cachedMaxVersions = UNINITIALIZED;
     }
-    values.put(new Bytes(key),
-        new Bytes(value));
+    values.put(new Bytes(key), new Bytes(value));
     return this;
   }
 
@@ -1020,7 +1025,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
 
   public static Unit getUnit(String key) {
     Unit unit;
-      /* TTL for now, we can add more as we neeed */
+      /* TTL for now, we can add more as we need */
     if (key.equals(HColumnDescriptor.TTL)) {
       unit = Unit.TIME_INTERVAL;
     } else if (key.equals(HColumnDescriptor.MOB_THRESHOLD)) {
@@ -1223,6 +1228,28 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
   }
 
   /**
+   * Get the mob compact partition policy for this family
+   * @return MobCompactPartitionPolicy
+   */
+  public MobCompactPartitionPolicy getMobCompactPartitionPolicy() {
+    String policy = getValue(MOB_COMPACT_PARTITION_POLICY);
+    if (policy == null) {
+      return DEFAULT_MOB_COMPACT_PARTITION_POLICY;
+    }
+
+    return MobCompactPartitionPolicy.valueOf(policy.toUpperCase(Locale.ROOT));
+  }
+
+  /**
+   * Set the mob compact partition policy for the family.
+   * @param policy policy type
+   * @return this (for chained invocation)
+   */
+  public HColumnDescriptor setMobCompactPartitionPolicy(MobCompactPartitionPolicy policy) {
+    return setValue(MOB_COMPACT_PARTITION_POLICY, policy.toString().toUpperCase(Locale.ROOT));
+  }
+
+  /**
    * @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set.
    *         <p>
    *         {@link #DEFAULT_DFS_REPLICATION} value indicates that user has explicitly not set any

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MobCompactPartitionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MobCompactPartitionPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MobCompactPartitionPolicy.java
new file mode 100644
index 0000000..f550572
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MobCompactPartitionPolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Enum describing the mob compact partition policy types.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum MobCompactPartitionPolicy {
+  /**
+   * Compact daily mob files into one file
+   */
+  DAILY,
+  /**
+   * Compact mob files within one calendar week into one file
+   */
+  WEEKLY,
+  /**
+   * Compact mob files within one calendar month into one file
+   */
+  MONTHLY
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
index c53fff2..cabf557 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java
@@ -114,12 +114,16 @@ public class TestHColumnDescriptor {
   public void testMobValuesInHColumnDescriptorShouldReadable() {
     boolean isMob = true;
     long threshold = 1000;
+    String policy = "weekly";
     String isMobString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(isMob)),
             HColumnDescriptor.getUnit(HColumnDescriptor.IS_MOB));
     String thresholdString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(threshold)),
             HColumnDescriptor.getUnit(HColumnDescriptor.MOB_THRESHOLD));
+    String policyString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(policy)),
+        HColumnDescriptor.getUnit(HColumnDescriptor.MOB_COMPACT_PARTITION_POLICY));
     assertEquals(String.valueOf(isMob), isMobString);
     assertEquals(String.valueOf(threshold), thresholdString);
+    assertEquals(String.valueOf(policy), policyString);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 678bea7..3b0b990 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -63,6 +65,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
 import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -81,6 +85,8 @@ import org.apache.hadoop.hbase.util.Threads;
 public final class MobUtils {
 
   private static final Log LOG = LogFactory.getLog(MobUtils.class);
+  private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7;
+  private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER;
 
   private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
       new ThreadLocal<SimpleDateFormat>() {
@@ -123,6 +129,45 @@ public final class MobUtils {
   }
 
   /**
+   * Get the first day of the input date's month
+   * @param calendar Calendar object
+   * @param date The date to find out its first day of that month
+   * @return The first day in the month
+   */
+  public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) {
+
+    calendar.setTime(date);
+    calendar.set(Calendar.HOUR_OF_DAY, 0);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MILLISECOND, 0);
+    calendar.set(Calendar.DAY_OF_MONTH, 1);
+
+    Date firstDayInMonth = calendar.getTime();
+    return firstDayInMonth;
+  }
+
+  /**
+   * Get the first day of the input date's week
+   * @param calendar Calendar object
+   * @param date The date to find out its first day of that week
+   * @return The first day in the week
+   */
+  public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) {
+
+    calendar.setTime(date);
+    calendar.set(Calendar.HOUR_OF_DAY, 0);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MILLISECOND, 0);
+    calendar.setFirstDayOfWeek(Calendar.MONDAY);
+    calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
+
+    Date firstDayInWeek = calendar.getTime();
+    return firstDayInWeek;
+  }
+
+  /**
    * Whether the current cell is a mob reference cell.
    * @param cell The current cell.
    * @return True if the cell has a mob reference tag, false if it doesn't.
@@ -247,8 +292,14 @@ public final class MobUtils {
       return;
     }
 
-    Date expireDate = new Date(current - timeToLive * 1000);
-    expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTimeInMillis(current - timeToLive * 1000);
+    calendar.set(Calendar.HOUR_OF_DAY, 0);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.SECOND, 0);
+
+    Date expireDate = calendar.getTime();
+
     LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
 
     FileStatus[] stats = null;
@@ -268,14 +319,13 @@ public final class MobUtils {
     for (FileStatus file : stats) {
       String fileName = file.getPath().getName();
       try {
-        MobFileName mobFileName = null;
-        if (!HFileLink.isHFileLink(file.getPath())) {
-          mobFileName = MobFileName.create(fileName);
-        } else {
+        if (HFileLink.isHFileLink(file.getPath())) {
           HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
-          mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
+          fileName = hfileLink.getOriginPath().getName();
         }
-        Date fileDate = parseDate(mobFileName.getDate());
+
+        Date fileDate = parseDate(MobFileName.getDateFromName(fileName));
+
         if (LOG.isDebugEnabled()) {
           LOG.debug("Checking file " + fileName);
         }
@@ -471,10 +521,10 @@ public final class MobUtils {
       Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
       Encryption.Context cryptoContext)
       throws IOException {
-    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
-        .replaceAll("-", ""));
+    MobFileName mobFileName = MobFileName.create(startKey, date,
+        UUID.randomUUID().toString().replaceAll("-", ""));
     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
-      cacheConfig, cryptoContext);
+        cacheConfig, cryptoContext);
   }
 
   /**
@@ -527,8 +577,8 @@ public final class MobUtils {
       Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
       Encryption.Context cryptoContext)
       throws IOException {
-    MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
-        .replaceAll("-", ""));
+    MobFileName mobFileName = MobFileName.create(startKey, date,
+        UUID.randomUUID().toString().replaceAll("-", ""));
     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
       cacheConfig, cryptoContext);
   }
@@ -710,7 +760,7 @@ public final class MobUtils {
     HColumnDescriptor hcd, ExecutorService pool, boolean allFiles, LockManager.MasterLock lock)
       throws IOException {
     String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
-      PartitionedMobCompactor.class.getName());
+        PartitionedMobCompactor.class.getName());
     // instantiate the mob compactor.
     MobCompactor compactor = null;
     try {
@@ -741,7 +791,7 @@ public final class MobUtils {
    */
   public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
     int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
-      MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
+        MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
     if (maxThreads == 0) {
       maxThreads = 1;
     }
@@ -854,4 +904,87 @@ public final class MobUtils {
     }
     return false;
   }
+
+  /**
+   * fill out partition id based on compaction policy and date, threshold...
+   * @param id Partition id to be filled out
+   * @param firstDayOfCurrentMonth The first day in the current month
+   * @param firstDayOfCurrentWeek The first day in the current week
+   * @param dateStr Date string from the mob file
+   * @param policy Mob compaction policy
+   * @param calendar Calendar object
+   * @param threshold Mob compaciton threshold configured
+   * @return true if the file needs to be excluded from compaction
+   */
+  public static boolean fillPartitionId(final CompactionPartitionId id,
+      final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr,
+      final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) {
+
+    boolean skipCompcation = false;
+    id.setThreshold(threshold);
+    if (threshold <= 0) {
+      id.setDate(dateStr);
+      return skipCompcation;
+    }
+
+    long finalThreshold;
+    Date date;
+    try {
+      date = MobUtils.parseDate(dateStr);
+    } catch (ParseException e)  {
+      LOG.warn("Failed to parse date " + dateStr, e);
+      id.setDate(dateStr);
+      return true;
+    }
+
+    /* The algorithm works as follows:
+     *    For monthly policy:
+     *       1). If the file's date is in past months, apply 4 * 7 * threshold
+     *       2). If the file's date is in past weeks, apply 7 * threshold
+     *       3). If the file's date is in current week, exclude it from the compaction
+     *    For weekly policy:
+     *       1). If the file's date is in past weeks, apply 7 * threshold
+     *       2). If the file's date in currently, apply threshold
+     *    For daily policy:
+     *       1). apply threshold
+     */
+    if (policy == MobCompactPartitionPolicy.MONTHLY) {
+      if (date.before(firstDayOfCurrentMonth)) {
+        // Check overflow
+        if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) {
+          finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold;
+        } else {
+          finalThreshold = Long.MAX_VALUE;
+        }
+        id.setThreshold(finalThreshold);
+
+        // set to the date for the first day of that month
+        id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date)));
+        return skipCompcation;
+      }
+    }
+
+    if ((policy == MobCompactPartitionPolicy.MONTHLY) ||
+        (policy == MobCompactPartitionPolicy.WEEKLY)) {
+      // Check if it needs to apply weekly multiplier
+      if (date.before(firstDayOfCurrentWeek)) {
+        // Check overflow
+        if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) {
+          finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold;
+        } else {
+          finalThreshold = Long.MAX_VALUE;
+        }
+        id.setThreshold(finalThreshold);
+
+        id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date)));
+        return skipCompcation;
+      } else if (policy == MobCompactPartitionPolicy.MONTHLY) {
+        skipCompcation = true;
+      }
+    }
+
+    // Rest is daily
+    id.setDate(dateStr);
+    return skipCompcation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
index 665b5e2..3335149 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -98,11 +99,15 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
   public static class CompactionPartitionId {
     private String startKey;
     private String date;
+    private String latestDate;
+    private long threshold;
 
     public CompactionPartitionId() {
       // initialize these fields to empty string
       this.startKey = "";
       this.date = "";
+      this.latestDate = "";
+      this.threshold = 0;
     }
 
     public CompactionPartitionId(String startKey, String date) {
@@ -111,6 +116,16 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
       }
       this.startKey = startKey;
       this.date = date;
+      this.latestDate = "";
+      this.threshold = 0;
+    }
+
+    public void setThreshold (final long threshold) {
+      this.threshold = threshold;
+    }
+
+    public long getThreshold () {
+      return this.threshold;
     }
 
     public String getStartKey() {
@@ -129,6 +144,14 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
       this.date = date;
     }
 
+    public String getLatestDate () { return this.latestDate; }
+
+    public void updateLatestDate(final String latestDate) {
+      if (this.latestDate.compareTo(latestDate) < 0) {
+        this.latestDate = latestDate;
+      }
+    }
+
     @Override
     public int hashCode() {
       int result = 17;

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 6fb1107..a0823d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mob.compactions;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.HFileLink;
@@ -151,6 +153,19 @@ public class PartitionedMobCompactor extends MobCompactor {
     final CompactionPartitionId id = new CompactionPartitionId();
     int selectedFileCount = 0;
     int irrelevantFileCount = 0;
+    MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy();
+
+    Calendar calendar =  Calendar.getInstance();
+    Date currentDate = new Date();
+    Date firstDayOfCurrentMonth = null;
+    Date firstDayOfCurrentWeek = null;
+
+    if (policy == MobCompactPartitionPolicy.MONTHLY) {
+      firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate);
+      firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
+    } else if (policy == MobCompactPartitionPolicy.WEEKLY) {
+      firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
+    }
 
     for (FileStatus file : candidates) {
       if (!file.isFile()) {
@@ -170,23 +185,32 @@ public class PartitionedMobCompactor extends MobCompactor {
       }
       if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
         allDelFiles.add(file);
-      } else if (allFiles || (linkedFile.getLen() < mergeableSize)) {
-        // add all files if allFiles is true,
-        // otherwise add the small files to the merge pool
+      } else {
         String fileName = linkedFile.getPath().getName();
-        id.setStartKey(MobFileName.getStartKeyFromName(fileName));
-        id.setDate(MobFileName.getDateFromName(fileName));
-        CompactionPartition compactionPartition = filesToCompact.get(id);
-        if (compactionPartition == null) {
-          CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate());
-          compactionPartition = new CompactionPartition(newId);
-
-          compactionPartition.addFile(file);
-          filesToCompact.put(newId, compactionPartition);
-        } else {
-          compactionPartition.addFile(file);
+        String date = MobFileName.getDateFromName(fileName);
+        boolean skipCompaction = MobUtils.fillPartitionId(id, firstDayOfCurrentMonth,
+            firstDayOfCurrentWeek, date, policy, calendar, mergeableSize);
+        if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) {
+          // add all files if allFiles is true,
+          // otherwise add the small files to the merge pool
+          // filter out files which are not supposed to be compacted with the
+          // current policy
+
+          id.setStartKey(MobFileName.getStartKeyFromName(fileName));
+          CompactionPartition compactionPartition = filesToCompact.get(id);
+          if (compactionPartition == null) {
+            CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate());
+            compactionPartition = new CompactionPartition(newId);
+            compactionPartition.addFile(file);
+            filesToCompact.put(newId, compactionPartition);
+            newId.updateLatestDate(date);
+          } else {
+            compactionPartition.addFile(file);
+            compactionPartition.getPartitionId().updateLatestDate(date);
+          }
+
+          selectedFileCount++;
         }
-        selectedFileCount++;
       }
     }
 
@@ -437,7 +461,7 @@ public class PartitionedMobCompactor extends MobCompactor {
     try {
       try {
         writer = MobUtils
-            .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
+            .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath,
                 Long.MAX_VALUE, column.getCompactionCompressionType(),
                 partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
         cleanupTmpMobFile = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
index 0739271..a4d984d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
@@ -38,6 +38,8 @@ import java.util.concurrent.TimeUnit;
 
 import javax.crypto.spec.SecretKeySpec;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -74,6 +77,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
 import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -86,6 +90,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -95,6 +100,7 @@ import org.junit.experimental.categories.Category;
 
 @Category(LargeTests.class)
 public class TestMobCompactor {
+  private static final Log LOG = LogFactory.getLog(TestMobCompactor.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static Configuration conf = null;
   private TableName tableName;
@@ -110,6 +116,43 @@ public class TestMobCompactor {
   private static final String family2 = "family2";
   private static final String qf1 = "qualifier1";
   private static final String qf2 = "qualifier2";
+
+  private static final long  tsFor20150907Monday = 1441654904000L;
+
+  private static final long  tsFor20151120Sunday = 1448051213000L;
+  private static final long  tsFor20151128Saturday = 1448734396000L;
+  private static final long  tsFor20151130Monday = 1448874000000L;
+  private static final long  tsFor20151201Tuesday = 1448960400000L;
+  private static final long  tsFor20151205Saturday = 1449306000000L;
+  private static final long  tsFor20151228Monday = 1451293200000L;
+  private static final long  tsFor20151231Thursday = 1451552400000L;
+  private static final long  tsFor20160101Friday = 1451638800000L;
+  private static final long  tsFor20160103Sunday = 1451844796000L;
+
+  private static final byte[] mobKey01 = Bytes.toBytes("r01");
+  private static final byte[] mobKey02 = Bytes.toBytes("r02");
+  private static final byte[] mobKey03 = Bytes.toBytes("r03");
+  private static final byte[] mobKey04 = Bytes.toBytes("r04");
+  private static final byte[] mobKey05 = Bytes.toBytes("r05");
+  private static final byte[] mobKey06 = Bytes.toBytes("r05");
+  private static final byte[] mobKey1 = Bytes.toBytes("r1");
+  private static final byte[] mobKey2 = Bytes.toBytes("r2");
+  private static final byte[] mobKey3 = Bytes.toBytes("r3");
+  private static final byte[] mobKey4 = Bytes.toBytes("r4");
+  private static final byte[] mobKey5 = Bytes.toBytes("r5");
+  private static final byte[] mobKey6 = Bytes.toBytes("r6");
+  private static final byte[] mobKey7 = Bytes.toBytes("r7");
+  private static final byte[] mobKey8 = Bytes.toBytes("r8");
+  private static final String mobValue0 = "mobValue00000000000000000000000000";
+  private static final String mobValue1 = "mobValue00000111111111111111111111";
+  private static final String mobValue2 = "mobValue00000222222222222222222222";
+  private static final String mobValue3 = "mobValue00000333333333333333333333";
+  private static final String mobValue4 = "mobValue00000444444444444444444444";
+  private static final String mobValue5 = "mobValue00000666666666666666666666";
+  private static final String mobValue6 = "mobValue00000777777777777777777777";
+  private static final String mobValue7 = "mobValue00000888888888888888888888";
+  private static final String mobValue8 = "mobValue00000888888888888888888899";
+
   private static byte[] KEYS = Bytes.toBytes("012");
   private static int regionNum = KEYS.length;
   private static int delRowNum = 1;
@@ -123,11 +166,12 @@ public class TestMobCompactor {
     TEST_UTIL.getConfiguration()
       .setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000);
     TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
-      KeyProviderForTesting.class.getName());
+        KeyProviderForTesting.class.getName());
     TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
     TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
     TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
     TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
     TEST_UTIL.startMiniCluster(1);
     pool = createThreadPool(TEST_UTIL.getConfiguration());
     conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
@@ -159,6 +203,37 @@ public class TestMobCompactor {
     bufMut = conn.getBufferedMutator(tableName);
   }
 
+  // Set up for mob compaction policy testing
+  private void setUpForPolicyTest(String tableNameAsString, MobCompactPartitionPolicy type)
+      throws IOException {
+    tableName = TableName.valueOf(tableNameAsString);
+    hcd1 = new HColumnDescriptor(family1);
+    hcd1.setMobEnabled(true);
+    hcd1.setMobThreshold(10);
+    hcd1.setMobCompactPartitionPolicy(type);
+    desc = new HTableDescriptor(tableName);
+    desc.addFamily(hcd1);
+    admin.createTable(desc);
+    table = conn.getTable(tableName);
+    bufMut = conn.getBufferedMutator(tableName);
+  }
+
+  // alter mob compaction policy
+  private void alterForPolicyTest(final MobCompactPartitionPolicy type)
+      throws Exception {
+
+    hcd1.setMobCompactPartitionPolicy(type);
+    desc.modifyFamily(hcd1);
+    admin.modifyTable(tableName, desc);
+    Pair<Integer, Integer> st;
+
+    while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) {
+      LOG.debug(st.getFirst() + " regions left to update");
+      Thread.sleep(40);
+    }
+    LOG.info("alter status finished");
+  }
+
   @Test(timeout = 300000)
   public void testMinorCompaction() throws Exception {
     resetConf();
@@ -219,6 +294,128 @@ public class TestMobCompactor {
       countFiles(tableName, false, family2));
   }
 
+  private void waitUntilFilesShowup(final TableName table, final String famStr, final int num)
+      throws InterruptedException, IOException  {
+
+    HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0);
+
+    // Make sure that it is flushed.
+    FileSystem fs = r.getRegionFileSystem().getFileSystem();
+    Path path = r.getRegionFileSystem().getStoreDir(famStr);
+
+
+    FileStatus[] fileList = fs.listStatus(path);
+
+    while (fileList.length != num) {
+      Thread.sleep(50);
+      fileList = fs.listStatus(path);
+    }
+  }
+
+  private int numberOfMobFiles(final TableName table, final String famStr)
+      throws IOException  {
+
+    HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0);
+
+    // Make sure that it is flushed.
+    FileSystem fs = r.getRegionFileSystem().getFileSystem();
+    Path path = r.getRegionFileSystem().getStoreDir(famStr);
+
+    FileStatus[] fileList = fs.listStatus(path);
+
+    return fileList.length;
+  }
+
+  @Test
+  public void testMinorCompactionWithWeeklyPolicy() throws Exception {
+    resetConf();
+    int mergeSize = 5000;
+    // change the mob compaction merge size
+    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+
+    commonPolicyTestLogic("testMinorCompactionWithWeeklyPolicy",
+        MobCompactPartitionPolicy.WEEKLY, false, 6,
+        new String[] { "20150907", "20151120", "20151128", "20151130", "20151205", "20160103" },
+        true);
+  }
+
+  @Test
+  public void testMajorCompactionWithWeeklyPolicy() throws Exception {
+    resetConf();
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyPolicy",
+        MobCompactPartitionPolicy.WEEKLY, true, 5,
+        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMonthlyPolicy() throws Exception {
+    resetConf();
+    int mergeSize = 5000;
+    // change the mob compaction merge size
+    conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+
+    commonPolicyTestLogic("testMinorCompactionWithMonthlyPolicy",
+        MobCompactPartitionPolicy.MONTHLY, false, 4,
+        new String[] { "20150907", "20151130", "20151231", "20160103" }, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithMonthlyPolicy() throws Exception {
+    resetConf();
+
+    commonPolicyTestLogic("testMajorCompactionWithMonthlyPolicy",
+        MobCompactPartitionPolicy.MONTHLY, true, 4,
+        new String[] {"20150907", "20151130", "20151231", "20160103"}, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithWeeklyFollowedByMonthly() throws Exception {
+    resetConf();
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly",
+        MobCompactPartitionPolicy.WEEKLY, true, 5,
+        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly",
+        MobCompactPartitionPolicy.MONTHLY, true, 4,
+        new String[] {"20150907", "20151128", "20151205", "20160103" }, false);
+  }
+
+  @Test
+  public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly() throws Exception {
+    resetConf();
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
+        MobCompactPartitionPolicy.WEEKLY, true, 5,
+        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
+        MobCompactPartitionPolicy.MONTHLY, true, 4,
+        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
+        MobCompactPartitionPolicy.WEEKLY, true, 4,
+        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
+  }
+
+  @Test
+  public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily() throws Exception {
+    resetConf();
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
+        MobCompactPartitionPolicy.WEEKLY, true, 5,
+        new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
+        MobCompactPartitionPolicy.MONTHLY, true, 4,
+        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
+
+    commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
+        MobCompactPartitionPolicy.DAILY, true, 4,
+        new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
+  }
+
   @Test(timeout = 300000)
   public void testCompactionWithHFileLink() throws IOException, InterruptedException {
     resetConf();
@@ -716,6 +913,65 @@ public class TestMobCompactor {
     admin.flush(tableName);
   }
 
+  private void loadDataForPartitionPolicy(Admin admin, BufferedMutator table, TableName tableName)
+      throws IOException {
+
+    Put[] pArray = new Put[1000];
+
+    for (int i = 0; i < 1000; i ++) {
+      Put put0 = new Put(Bytes.toBytes("r0" + i));
+      put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151130Monday, Bytes.toBytes(mobValue0));
+      pArray[i] = put0;
+    }
+    loadData(admin, bufMut, tableName, pArray);
+
+    Put put06 = new Put(mobKey06);
+    put06.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151128Saturday, Bytes.toBytes(mobValue0));
+
+    loadData(admin, bufMut, tableName, new Put[] { put06 });
+
+    Put put1 = new Put(mobKey1);
+    put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151201Tuesday,
+        Bytes.toBytes(mobValue1));
+    loadData(admin, bufMut, tableName, new Put[] { put1 });
+
+    Put put2 = new Put(mobKey2);
+    put2.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151205Saturday,
+        Bytes.toBytes(mobValue2));
+    loadData(admin, bufMut, tableName, new Put[] { put2 });
+
+    Put put3 = new Put(mobKey3);
+    put3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151228Monday,
+        Bytes.toBytes(mobValue3));
+    loadData(admin, bufMut, tableName, new Put[] { put3 });
+
+    Put put4 = new Put(mobKey4);
+    put4.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151231Thursday,
+        Bytes.toBytes(mobValue4));
+    loadData(admin, bufMut, tableName, new Put[] { put4 });
+
+    Put put5 = new Put(mobKey5);
+    put5.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160101Friday,
+        Bytes.toBytes(mobValue5));
+    loadData(admin, bufMut, tableName, new Put[] { put5 });
+
+    Put put6 = new Put(mobKey6);
+    put6.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160103Sunday,
+        Bytes.toBytes(mobValue6));
+    loadData(admin, bufMut, tableName, new Put[] { put6 });
+
+    Put put7 = new Put(mobKey7);
+    put7.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20150907Monday,
+        Bytes.toBytes(mobValue7));
+    loadData(admin, bufMut, tableName, new Put[] { put7 });
+
+    Put put8 = new Put(mobKey8);
+    put8.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151120Sunday,
+        Bytes.toBytes(mobValue8));
+    loadData(admin, bufMut, tableName, new Put[] { put8 });
+  }
+
+
   /**
    * delete the row, family and cell to create the del file
    */
@@ -833,4 +1089,127 @@ public class TestMobCompactor {
     conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
       MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
   }
-}
+
+  /**
+   * Verify mob partition policy compaction values.
+   */
+  private void verifyPolicyValues() throws Exception {
+    Get get = new Get(mobKey01);
+    Result result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue0)));
+
+    get = new Get(mobKey02);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue0)));
+
+    get = new Get(mobKey03);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue0)));
+
+    get = new Get(mobKey04);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue0)));
+
+    get = new Get(mobKey05);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue0)));
+
+    get = new Get(mobKey06);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue0)));
+
+    get = new Get(mobKey1);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue1)));
+
+    get = new Get(mobKey2);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue2)));
+
+    get = new Get(mobKey3);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue3)));
+
+    get = new Get(mobKey4);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue4)));
+
+    get = new Get(mobKey5);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue5)));
+
+    get = new Get(mobKey6);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue6)));
+
+    get = new Get(mobKey7);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue7)));
+
+    get = new Get(mobKey8);
+    result = table.get(get);
+    assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
+        Bytes.toBytes(mobValue8)));
+  }
+
+  private void commonPolicyTestLogic (final String tableNameAsString,
+      final MobCompactPartitionPolicy pType, final boolean majorCompact,
+      final int expectedFileNumbers, final String[] expectedFileNames,
+      final boolean setupAndLoadData
+      ) throws Exception {
+    if (setupAndLoadData) {
+      setUpForPolicyTest(tableNameAsString, pType);
+
+      loadDataForPartitionPolicy(admin, bufMut, tableName);
+    } else {
+      alterForPolicyTest(pType);
+    }
+
+    if (majorCompact) {
+      admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB);
+    } else {
+      admin.compact(tableName, hcd1.getName(), CompactType.MOB);
+    }
+
+    waitUntilMobCompactionFinished(tableName);
+
+    // Run cleaner to make sure that files in archive directory are cleaned up
+    TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
+
+    //check the number of files
+    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, family1);
+    FileStatus[] fileList = fs.listStatus(mobDirPath);
+
+    assertTrue(fileList.length == expectedFileNumbers);
+
+    // the file names are expected
+    ArrayList<String> fileNames = new ArrayList<>(expectedFileNumbers);
+    for (FileStatus file : fileList) {
+      fileNames.add(MobFileName.getDateFromName(file.getPath().getName()));
+    }
+    int index = 0;
+    for (String fileName : expectedFileNames) {
+      index = fileNames.indexOf(fileName);
+      assertTrue(index >= 0);
+      fileNames.remove(index);
+    }
+
+    // Check daily mob files are removed from the mobdir, and only weekly mob files are there.
+    // Also check that there is no data loss.
+
+    verifyPolicyValues();
+  }
+ }

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
index c112289..0aa9426 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -19,13 +19,8 @@
 package org.apache.hadoop.hbase.mob.compactions;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
+import java.text.ParseException;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
@@ -33,12 +28,15 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
 import org.apache.hadoop.hbase.regionserver.*;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.client.Scan;
@@ -63,9 +61,11 @@ import org.junit.experimental.categories.Category;
 
 @Category(LargeTests.class)
 public class TestPartitionedMobCompactor {
+  private static final Log LOG = LogFactory.getLog(TestPartitionedMobCompactor.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static String family = "family";
   private final static String qf = "qf";
+  private final long DAY_IN_MS = 1000 * 60 * 60 * 24;
   private HColumnDescriptor hcd = new HColumnDescriptor(family);
   private Configuration conf = TEST_UTIL.getConfiguration();
   private CacheConfig cacheConf = new CacheConfig(conf);
@@ -104,6 +104,109 @@ public class TestPartitionedMobCompactor {
   }
 
   @Test
+  public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception {
+    String tableName = "testCompactionSelectAllFilesWeeklyPolicy";
+    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
+        CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
+  }
+
+  @Test
+  public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception {
+    String tableName = "testCompactionSelectPartFilesWeeklyPolicy";
+    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
+        new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
+  }
+
+  @Test
+  public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception {
+    String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek";
+    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
+    testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek,
+        MobCompactPartitionPolicy.WEEKLY, 7);
+  }
+
+  @Test
+  public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception {
+    String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek";
+    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
+    testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
+        false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7);
+  }
+
+  @Test
+  public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception {
+    String tableName = "testCompactionSelectAllFilesMonthlyPolicy";
+    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
+    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
+        CompactionType.ALL_FILES, false, false, dateLastWeek,
+        MobCompactPartitionPolicy.MONTHLY, 7);
+  }
+
+  @Test
+  public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception {
+    String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy";
+    testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
+        CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
+  }
+
+  @Test
+  public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception {
+    String tableName = "testCompactionSelectPartFilesMonthlyPolicy";
+    testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
+        new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
+  }
+
+  @Test
+  public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception {
+    String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek";
+    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
+    Calendar calendar =  Calendar.getInstance();
+    Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date());
+    CompactionType type = CompactionType.PART_FILES;
+    long mergeSizeMultiFactor = 7;
+
+
+    // The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going
+    // to be last month and the monthly policy is going to be applied here.
+    if (dateLastWeek.before(firstDayOfCurrentMonth)) {
+      type = CompactionType.ALL_FILES;
+      mergeSizeMultiFactor *= 4;
+    }
+
+    testCompactionAtMergeSize(tableName, 700, type, false, false, dateLastWeek,
+        MobCompactPartitionPolicy.MONTHLY, mergeSizeMultiFactor);
+  }
+
+  @Test
+  public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception {
+    String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek";
+    Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
+
+    testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
+        false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7);
+  }
+
+  @Test
+  public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exception {
+    String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth";
+
+    // back 5 weeks, it is going to be a past month
+    Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
+    testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth,
+        MobCompactPartitionPolicy.MONTHLY, 28);
+  }
+
+  @Test
+  public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exception {
+    String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth";
+
+    // back 5 weeks, it is going to be a past month
+    Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
+    testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES,
+        false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28);
+  }
+
+  @Test
   public void testCompactionSelectWithAllFiles() throws Exception {
     String tableName = "testCompactionSelectWithAllFiles";
     // If there is only 1 file, it will not be compacted with _del files, so
@@ -121,7 +224,6 @@ public class TestPartitionedMobCompactor {
         CompactionType.PART_FILES, false);
   }
 
-
   @Test
   public void testCompactionSelectWithPartFiles() throws Exception {
     String tableName = "testCompactionSelectWithPartFiles";
@@ -144,34 +246,76 @@ public class TestPartitionedMobCompactor {
       final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
       final boolean createDelFiles)
       throws Exception {
+    Date date = new Date();
+    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date);
+  }
+
+  private void testCompactionAtMergeSize(final String tableName,
+      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
+      final boolean createDelFiles, final Date date)
+      throws Exception {
+    testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date,
+        MobCompactPartitionPolicy.DAILY, 1);
+  }
+
+  private void testCompactionAtMergeSize(final String tableName,
+      final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
+      final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy,
+      final long mergeSizeMultiFactor)
+      throws Exception {
     resetConf();
     init(tableName);
     int count = 10;
     // create 10 mob files.
-    createStoreFiles(basePath, family, qf, count, Type.Put);
+    createStoreFiles(basePath, family, qf, count, Type.Put, date);
 
     if (createDelFiles) {
       // create 10 del files
-      createStoreFiles(basePath, family, qf, count, Type.Delete);
+      createStoreFiles(basePath, family, qf, count, Type.Delete, date);
     }
 
+    Calendar calendar =  Calendar.getInstance();
+    Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date());
+
     listFiles();
     List<String> expectedStartKeys = new ArrayList<>();
     for(FileStatus file : mobFiles) {
-      if(file.getLen() < mergeSize) {
+      if(file.getLen() < mergeSize * mergeSizeMultiFactor) {
         String fileName = file.getPath().getName();
         String startKey = fileName.substring(0, 32);
 
+        // If the policy is monthly and files are in current week, they will be skipped
+        // in minor compcation.
+        boolean skipCompaction = false;
+        if (policy == MobCompactPartitionPolicy.MONTHLY) {
+          String fileDateStr = MobFileName.getDateFromName(fileName);
+          Date fileDate;
+          try {
+            fileDate = MobUtils.parseDate(fileDateStr);
+          } catch (ParseException e)  {
+            LOG.warn("Failed to parse date " + fileDateStr, e);
+            fileDate = new Date();
+          }
+          if (!fileDate.before(firstDayOfCurrentWeek)) {
+            skipCompaction = true;
+          }
+        }
+
         // If it is not an major mob compaction and del files are there,
         // these mob files wont be compacted.
-        if (isForceAllFiles || !createDelFiles) {
+        if (isForceAllFiles || (!createDelFiles && !skipCompaction)) {
           expectedStartKeys.add(startKey);
         }
       }
     }
+
+    // Set the policy
+    this.hcd.setMobCompactPartitionPolicy(policy);
     // set the mob compaction mergeable threshold
     conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
     testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys);
+    // go back to the default daily policy
+    this.hcd.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.DAILY);
   }
 
   @Test
@@ -205,7 +349,7 @@ public class TestPartitionedMobCompactor {
     try {
       int count = 2;
       // create 2 mob files.
-      createStoreFiles(basePath, family, qf, count, Type.Put, true);
+      createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date());
       listFiles();
 
       TableName tName = TableName.valueOf(tableName);
@@ -243,9 +387,9 @@ public class TestPartitionedMobCompactor {
     resetConf();
     init(tableName);
     // create 20 mob files.
-    createStoreFiles(basePath, family, qf, 20, Type.Put);
+    createStoreFiles(basePath, family, qf, 20, Type.Put, new Date());
     // create 13 del files
-    createStoreFiles(basePath, family, qf, 13, Type.Delete);
+    createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date());
     listFiles();
 
     // set the max del file count
@@ -366,12 +510,12 @@ public class TestPartitionedMobCompactor {
    * @type the key type
    */
   private void createStoreFiles(Path basePath, String family, String qualifier, int count,
-      Type type) throws IOException {
-    createStoreFiles(basePath, family, qualifier, count, type, false);
+      Type type, final Date date) throws IOException {
+    createStoreFiles(basePath, family, qualifier, count, type, false, date);
   }
 
   private void createStoreFiles(Path basePath, String family, String qualifier, int count,
-      Type type, boolean sameStartKey) throws IOException {
+      Type type, boolean sameStartKey, final Date date) throws IOException {
     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
     String startKey = "row_";
     MobFileName mobFileName = null;
@@ -386,12 +530,10 @@ public class TestPartitionedMobCompactor {
         startRow = Bytes.toBytes(startKey + i);
       }
       if(type.equals(Type.Delete)) {
-        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
-            new Date()), delSuffix);
+        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix);
       }
       if(type.equals(Type.Put)){
-        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
-            new Date()), mobSuffix);
+        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix);
       }
       StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
       .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 94b9d3e..a1a9336 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -878,6 +878,15 @@ module Hbase
           storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
           family.setStoragePolicy(storage_policy)
       end
+      if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY)
+        mob_partition_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY).upcase
+        unless org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.include?(mob_partition_policy)
+          raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.join(" "))
+        else
+          family.setMobCompactPartitionPolicy(org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.valueOf(mob_partition_policy))
+        end
+      end
+
 
       set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
       set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]

http://git-wip-us.apache.org/repos/asf/hbase/blob/11592965/hbase-shell/src/main/ruby/shell/commands/create.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb
index ee14455..4812048 100644
--- a/hbase-shell/src/main/ruby/shell/commands/create.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/create.rb
@@ -38,6 +38,7 @@ Create a table with namespace=default and table qualifier=t1
   hbase> create 't1', 'f1', 'f2', 'f3'
   hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}
   hbase> create 't1', {NAME => 'f1', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}}
+  hbase> create 't1', {NAME => 'f1', IS_MOB => true, MOB_THRESHOLD => 1000000, MOB_COMPACT_PARTITION_POLICY => 'weekly'}
 
 Table configuration options can be put at the end.
 Examples:


Mime
View raw message