falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject git commit: FALCON-284 Hcatalog based feed retention doesn't work when partition filter spans across multiple partition keys. Contributed by Satish Mittal
Date Thu, 24 Apr 2014 10:07:22 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 7fa6ca57e -> 173ebec19


FALCON-284 Hcatalog based feed retention doesn't work when partition filter spans across multiple
partition keys. Contributed by Satish Mittal


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/173ebec1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/173ebec1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/173ebec1

Branch: refs/heads/master
Commit: 173ebec197358e1881d3e58732881e27149d05a6
Parents: 7fa6ca5
Author: Shwetha GS <shwethags@gmail.com>
Authored: Thu Apr 24 15:37:15 2014 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Thu Apr 24 15:37:15 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../falcon/catalog/AbstractCatalogService.java  |  10 +
 .../falcon/catalog/HiveCatalogService.java      |  20 ++
 .../falcon/entity/common/FeedDataPath.java      |   9 +
 .../apache/falcon/retention/FeedEvictor.java    | 194 ++++++++++++----
 .../falcon/catalog/HiveCatalogServiceIT.java    |  19 ++
 .../lifecycle/TableStorageFeedEvictorIT.java    | 228 ++++++++++++++++++-
 7 files changed, 433 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27c2056..153aebe 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,9 @@ Trunk (Unreleased)
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 
   BUG FIXES
+    FALCON-284 Hcatalog based feed retention doesn't work when partition filter spans across

+    multiple partition keys. (Satish Mittal via Shwetha GS)
+
     FALCON-409 Not able to create a package. (Raju Bairishetti via Shwetha GS)
 
     FALCON-396 minor logging typo in FalconTopicSubscriber. (Raghav Kumar Gautam via Shwetha
GS)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index fc9c3b1..df55b88 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -107,4 +107,14 @@ public abstract class AbstractCatalogService {
      */
     public abstract CatalogPartition getPartition(String catalogUrl, String database, String
tableName,
                                                   Map<String, String> partitionSpec)
throws FalconException;
+
+    /**
+     * @param catalogUrl url for the catalog service
+     * @param database database the table belongs to
+     * @param tableName table name
+     * @return list of partition column names of the table
+     * @throws FalconException
+     */
+    public abstract List<String> getTablePartitionCols(String catalogUrl, String database,
+                                                     String tableName) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 3c3660e..30736f3 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -173,6 +173,7 @@ public class HiveCatalogService extends AbstractCatalogService {
             HCatClient client = get(catalogUrl);
             List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database,
tableName, filter);
             for (HCatPartition hCatPartition : hCatPartitions) {
+                LOG.info("Partition: " + hCatPartition.getValues());
                 CatalogPartition partition = createCatalogPartition(hCatPartition);
                 catalogPartitionList.add(partition);
             }
@@ -233,4 +234,23 @@ public class HiveCatalogService extends AbstractCatalogService {
             throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
         }
     }
+
+    @Override
+    public List<String> getTablePartitionCols(String catalogUrl, String database,
+                                            String tableName) throws FalconException {
+        LOG.info("Fetching partition columns of table: " + tableName);
+
+        try {
+            HCatClient client = get(catalogUrl);
+            HCatTable table = client.getTable(database, tableName);
+            List<HCatFieldSchema> partSchema = table.getPartCols();
+            List<String> partCols = new ArrayList<String>();
+            for (HCatFieldSchema part : partSchema) {
+                partCols.add(part.getName());
+            }
+            return partCols;
+        } catch (HCatException e) {
+            throw new FalconException("Exception fetching partition columns: " + e.getMessage(),
e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
index 4031e14..39e636b 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
@@ -52,6 +52,15 @@ public final class FeedDataPath {
             }
             return null;
         }
+
+        public static VARS presentIn(String str) {
+            for (VARS var : VARS.values()) {
+                if (str.contains(var.datePattern)) {
+                    return var;
+                }
+            }
+            return null;
+        }
     }
 
     public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex()

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index a8db52e..138a769 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -33,6 +33,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TimeZone;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -84,6 +85,15 @@ public class FeedEvictor extends Configured implements Tool {
 
     private static final String FORMAT = "yyyyMMddHHmm";
 
+    // constants to be used while preparing HCatalog partition filter query
+    private static final String FILTER_ST_BRACKET = "(";
+    private static final String FILTER_END_BRACKET = ")";
+    private static final String FILTER_QUOTE = "'";
+    private static final String FILTER_AND = " and ";
+    private static final String FILTER_OR = " or ";
+    private static final String FILTER_LESS_THAN = " < ";
+    private static final String FILTER_EQUALS = " = ";
+
     public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
         Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
@@ -375,16 +385,13 @@ public class FeedEvictor extends Configured implements Tool {
         LOG.info("Applying retention on " + storage.getTable()
                 + ", Limit: " + retentionLimit + ", timezone: " + timeZone);
 
-        String datedPartitionKey = storage.getDatedPartitionKey();
-        String datePattern = storage.getPartitionValue(datedPartitionKey);
-        String dateMask = datePattern.replaceAll(VARS.YEAR.regex(), "yyyy")
-                .replaceAll(VARS.MONTH.regex(), "MM")
-                .replaceAll(VARS.DAY.regex(), "dd")
-                .replaceAll(VARS.HOUR.regex(), "HH")
-                .replaceAll(VARS.MINUTE.regex(), "mm");
+        // get sorted date partition keys and values
+        List<String> datedPartKeys = new ArrayList<String>();
+        List<String> datedPartValues = new ArrayList<String>();
+        fillSortedDatedPartitionKVs(storage, datedPartKeys, datedPartValues, retentionLimit,
timeZone);
 
         List<CatalogPartition> toBeDeleted = discoverPartitionsToDelete(
-                storage, retentionLimit, timeZone, dateMask);
+                storage, datedPartKeys, datedPartValues);
         if (toBeDeleted.isEmpty()) {
             LOG.info("No partitions to delete.");
             return;
@@ -393,69 +400,164 @@ public class FeedEvictor extends Configured implements Tool {
         final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
                 storage.getCatalogUrl(), storage.getDatabase(), storage.getTable());
 
-        dropPartitions(storage, toBeDeleted, isTableExternal);
+        dropPartitions(storage, toBeDeleted, datedPartKeys, isTableExternal);
     }
 
-    private List<CatalogPartition> discoverPartitionsToDelete(CatalogStorage storage,
String retentionLimit,
-                                                           String timeZone, String dateMask)
-        throws FalconException, ELException {
+    private List<CatalogPartition> discoverPartitionsToDelete(CatalogStorage storage,
+        List<String> datedPartKeys, List<String> datedPartValues) throws FalconException,
ELException {
 
-        final String filter = createFilter(storage, retentionLimit, timeZone, dateMask);
+        final String filter = createFilter(datedPartKeys, datedPartValues);
         return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
                 storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), filter);
     }
 
-    private String createFilter(CatalogStorage storage, String retentionLimit,
-                                String timeZone, String dateMask) throws ELException {
-
+    private void fillSortedDatedPartitionKVs(CatalogStorage storage, List<String> sortedPartKeys,
+        List<String> sortedPartValues, String retentionLimit, String timeZone) throws
ELException {
         Pair<Date, Date> range = getDateRange(retentionLimit);
-        DateFormat dateFormat = new SimpleDateFormat(dateMask);
-        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-        String beforeDate = dateFormat.format(range.first);
 
-        String datedPartitionKey = storage.getDatedPartitionKey();
+        // sort partition keys and values by the date pattern present in value
+        Map<VARS, String> sortedPartKeyMap = new TreeMap<VARS, String>();
+        Map<VARS, String> sortedPartValueMap = new TreeMap<VARS, String>();
+        for (Entry<String, String> entry : storage.getPartitions().entrySet()) {
+            String datePattern = entry.getValue();
+            String mask = datePattern.replaceAll(VARS.YEAR.regex(), "yyyy")
+                .replaceAll(VARS.MONTH.regex(), "MM")
+                .replaceAll(VARS.DAY.regex(), "dd")
+                .replaceAll(VARS.HOUR.regex(), "HH")
+                .replaceAll(VARS.MINUTE.regex(), "mm");
 
+            // find the first date pattern present in date mask
+            VARS vars = VARS.presentIn(mask);
+            // skip this partition if date mask doesn't contain any date format
+            if (vars == null) {
+                continue;
+            }
+
+            // construct dated partition value as per format
+            DateFormat dateFormat = new SimpleDateFormat(mask);
+            dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+            String partitionValue = dateFormat.format(range.first);
+
+            // add partition key and value in their sorted maps
+            if (!sortedPartKeyMap.containsKey(vars)) {
+                sortedPartKeyMap.put(vars, entry.getKey());
+            }
+
+            if (!sortedPartValueMap.containsKey(vars)) {
+                sortedPartValueMap.put(vars, partitionValue);
+            }
+        }
+
+        // add map entries to lists of partition keys and values
+        sortedPartKeys.addAll(sortedPartKeyMap.values());
+        sortedPartValues.addAll(sortedPartValueMap.values());
+    }
+
+    private String createFilter(List<String> datedPartKeys, List<String> datedPartValues)
+        throws ELException {
+
+        int numPartitions = datedPartKeys.size();
+
+        /* Construct filter query string. As an example, suppose the dated partition keys
+         * are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10].
+         * Then the filter query generated is of the format:
+         * "(year < '2014') or (year = '2014' and month < '02') or
+         * (year = '2014' and month = '02' and day < '24') or
+         * or (year = '2014' and month = '02' and day = '24' and hour < '10')"
+         */
         StringBuilder filterBuffer = new StringBuilder();
-        filterBuffer.append(datedPartitionKey)
-                .append(" < ")
-                .append("'")
-                .append(beforeDate)
-                .append("'");
+        for (int curr = 0; curr < numPartitions; curr++) {
+            if (curr > 0) {
+                filterBuffer.append(FILTER_OR);
+            }
+            filterBuffer.append(FILTER_ST_BRACKET);
+            for (int prev = 0; prev < curr; prev++) {
+                filterBuffer.append(datedPartKeys.get(prev))
+                    .append(FILTER_EQUALS)
+                    .append(FILTER_QUOTE)
+                    .append(datedPartValues.get(prev))
+                    .append(FILTER_QUOTE)
+                    .append(FILTER_AND);
+            }
+            filterBuffer.append(datedPartKeys.get(curr))
+                  .append(FILTER_LESS_THAN)
+                  .append(FILTER_QUOTE)
+                  .append(datedPartValues.get(curr))
+                  .append(FILTER_QUOTE)
+                  .append(FILTER_END_BRACKET);
+        }
 
         return filterBuffer.toString();
     }
 
     private void dropPartitions(CatalogStorage storage, List<CatalogPartition> partitionsToDelete,
-                                boolean isTableExternal) throws FalconException, IOException
{
-
+        List<String> datedPartKeys, boolean isTableExternal) throws FalconException,
IOException {
+
+        // get table partition columns
+        List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
+            storage.getCatalogUrl(), storage.getDatabase(), storage.getTable());
+
+        /* In case partition columns are a super-set of dated partitions, there can be multiple
+         * partitions that share the same set of date-partition values. All such partitions
can
+         * be deleted by issuing a single HCatalog dropPartition call per date-partition
values.
+         * Arrange the partitions grouped by each set of date-partition values.
+         */
+        Map<Map<String, String>, List<CatalogPartition>> dateToPartitionsMap
= new HashMap<
+            Map<String, String>, List<CatalogPartition>>();
         for (CatalogPartition partitionToDrop : partitionsToDelete) {
-            if (dropPartition(storage, partitionToDrop, isTableExternal)) {
-                LOG.info("Deleted partition: " + partitionToDrop.getValues());
-                buffer.append(partitionToDrop.getValues().get(0)).append(',');
-                instancePaths.append(partitionToDrop.getValues()).append(",");
+            // create a map of name-values of all columns of this partition
+            Map<String, String> partitions = new HashMap<String, String>();
+            for (int i = 0; i < partColumns.size(); i++) {
+                partitions.put(partColumns.get(i), partitionToDrop.getValues().get(i));
+            }
+
+            // create a map of name-values of dated sub-set of this partition
+            Map<String, String> datedPartitions = new HashMap<String, String>();
+            for (String datedPart : datedPartKeys) {
+                datedPartitions.put(datedPart, partitions.get(datedPart));
             }
-        }
-    }
 
-    private boolean dropPartition(CatalogStorage storage, CatalogPartition partitionToDrop,
-                                  boolean isTableExternal) throws FalconException, IOException
{
+            // add a map entry of this catalog partition corresponding to its date-partition
values
+            List<CatalogPartition> catalogPartitions;
+            if (dateToPartitionsMap.containsKey(datedPartitions)) {
+                catalogPartitions = dateToPartitionsMap.get(datedPartitions);
+            } else {
+                catalogPartitions = new ArrayList<CatalogPartition>();
+            }
+            catalogPartitions.add(partitionToDrop);
+            dateToPartitionsMap.put(datedPartitions, catalogPartitions);
+        }
 
-        String datedPartitionKey = storage.getDatedPartitionKey();
+        // delete each entry within dateToPartitions Map
+        for (Entry<Map<String, String>, List<CatalogPartition>> entry :
dateToPartitionsMap.entrySet()) {
+            dropPartitionInstances(storage, entry.getValue(), entry.getKey(), isTableExternal);
+        }
+    }
 
-        Map<String, String> partitions = new HashMap<String, String>();
-        partitions.put(datedPartitionKey, partitionToDrop.getValues().get(0));
+    private void dropPartitionInstances(CatalogStorage storage, List<CatalogPartition>
partitionsToDrop,
+        Map<String, String> partSpec, boolean isTableExternal) throws FalconException,
IOException {
 
-        boolean dropped = CatalogServiceFactory.getCatalogService().dropPartitions(
-                storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partitions);
+        boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions(
+                storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partSpec);
 
-        boolean deleted = true;
-        if (isTableExternal) { // nuke the dirs if an external table
-            final String location = partitionToDrop.getLocation();
-            final Path path = new Path(location);
-            deleted = path.getFileSystem(new Configuration()).delete(path, true);
+        if (!deleted) {
+            return;
         }
 
-        return dropped && deleted;
+        for (CatalogPartition partitionToDrop : partitionsToDrop) {
+            if (isTableExternal) { // nuke the dirs if an external table
+                final String location = partitionToDrop.getLocation();
+                final Path path = new Path(location);
+                deleted = path.getFileSystem(new Configuration()).delete(path, true);
+            }
+            if (!isTableExternal || deleted) {
+                // replace ',' with ';' since message producer splits instancePaths string
by ','
+                String partitionInfo = partitionToDrop.getValues().toString().replace(","
, ";");
+                LOG.info("Deleted partition: " + partitionInfo);
+                buffer.append(partSpec).append(',');
+                instancePaths.append(partitionInfo).append(",");
+            }
+        }
     }
 
     private void deleteParentIfEmpty(FileSystem fs, Path parent, Path feedBasePath) throws
IOException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index fd004a1..6966a8d 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -37,6 +37,7 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -334,4 +335,22 @@ public class HiveCatalogServiceIT {
 
         Assert.assertTrue(reInstatedCreateTime > originalCreateTime);
     }
+
+    @DataProvider (name = "tableName")
+    public Object[][] createTableName() {
+        return new Object[][] {
+            {TABLE_NAME},
+            {EXTERNAL_TABLE_NAME},
+        };
+    }
+
+    @Test  (dataProvider = "tableName")
+    public void testGetTablePartitionCols(String tableName) throws Exception {
+        List<String> partCols = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
+                METASTORE_URL, DATABASE_NAME, tableName);
+        Assert.assertEquals(partCols.size(), 2);
+        Collections.sort(partCols);
+        Assert.assertEquals(partCols.get(0), "ds");
+        Assert.assertEquals(partCols.get(1), "region");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 770780e..894a194 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.TreeMap;
 
 /**
  * Test for FeedEvictor for table.
@@ -71,8 +72,12 @@ public class TableStorageFeedEvictorIT {
     private static final String DATABASE_NAME = "falcon_db";
     private static final String TABLE_NAME = "clicks";
     private static final String EXTERNAL_TABLE_NAME = "clicks_external";
+    private static final String MULTI_COL_DATED_TABLE_NAME = "downloads";
+    private static final String MULTI_COL_DATED_EXTERNAL_TABLE_NAME = "downloads_external";
     private static final String STORAGE_URL = "jail://global:00";
     private static final String EXTERNAL_TABLE_LOCATION = STORAGE_URL + "/falcon/staging/clicks_external/";
+    private static final String MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION = STORAGE_URL
+        + "/falcon/staging/downloads_external/";
 
     private final InMemoryWriter stream = new InMemoryWriter(System.out);
 
@@ -89,12 +94,19 @@ public class TableStorageFeedEvictorIT {
         HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
         HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME,
                 partitionKeys, EXTERNAL_TABLE_LOCATION);
+
+        final List<String> multiColDatedPartitionKeys = Arrays.asList("year", "month",
"day", "region");
+        HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_TABLE_NAME,
multiColDatedPartitionKeys);
+        HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_EXTERNAL_TABLE_NAME,
+                multiColDatedPartitionKeys, MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION);
     }
 
     @AfterClass
     public void close() throws Exception {
         HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME);
         HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME);
+        HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_EXTERNAL_TABLE_NAME);
+        HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_TABLE_NAME);
         HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
     }
 
@@ -162,8 +174,7 @@ public class TableStorageFeedEvictorIT {
                     "Unexpected number of evicted partitions");
 
             final String actualInstancesEvicted = readLogFile(new Path(logFile));
-            Assert.assertEquals(actualInstancesEvicted, expectedInstancePaths.toString(),
-                    "Unexpected number of Logged partitions");
+            validateInstancePaths(actualInstancesEvicted, expectedInstancePaths.toString());
 
             if (isExternal) {
                 verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask,
timeZone);
@@ -216,6 +227,74 @@ public class TableStorageFeedEvictorIT {
         }
     }
 
+    @DataProvider (name = "multiColDatedEvictorTestDataProvider")
+    private Object[][] createMultiColDatedEvictorTestData() {
+        return new Object[][] {
+            {"days(10)", false},
+            {"days(10)", true},
+            {"days(15)", false},
+            {"days(15)", true},
+            {"days(100)", false},
+            {"days(100)", true},
+        };
+    }
+
+    @Test (dataProvider = "multiColDatedEvictorTestDataProvider")
+    public void testFeedEvictorForMultiColDatedTableStorage(String retentionLimit, boolean
isExternal)
+        throws Exception {
+        final String tableName = isExternal ? MULTI_COL_DATED_EXTERNAL_TABLE_NAME : MULTI_COL_DATED_TABLE_NAME;
+        final String timeZone = "UTC";
+
+        List<Map<String, String>> candidatePartitions = getMultiColDatedCandidatePartitions("days(10)",
timeZone, 3);
+        addMultiColDatedPartitions(tableName, candidatePartitions, isExternal);
+
+        List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, tableName);
+        Assert.assertEquals(partitions.size(), candidatePartitions.size());
+        Pair<Date, Date> range = getDateRange(retentionLimit);
+        List<HCatPartition> filteredPartitions = getMultiColDatedFilteredPartitions(tableName,
timeZone, range);
+
+        try {
+            stream.clear();
+
+            final String tableUri = DATABASE_NAME + "/" + tableName
+                + "/year=${YEAR};month=${MONTH};day=${DAY};region=us";
+            String feedBasePath = METASTORE_URL + tableUri;
+            String logFile = STORAGE_URL + "/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+
+            FeedEvictor.main(new String[]{
+                "-feedBasePath", feedBasePath,
+                "-retentionType", "instance",
+                "-retentionLimit", retentionLimit,
+                "-timeZone", timeZone,
+                "-frequency", "daily",
+                "-logFile", logFile,
+                "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
+            });
+
+            StringBuilder expectedInstancePaths = new StringBuilder();
+            List<Map<String, String>> expectedInstancesEvicted = getMultiColDatedExpectedEvictedInstances(
+                candidatePartitions, range.first, timeZone, expectedInstancePaths);
+            int expectedSurvivorSize = candidatePartitions.size() - expectedInstancesEvicted.size();
+
+            List<HCatPartition> survivingPartitions = client.getPartitions(DATABASE_NAME,
tableName);
+            Assert.assertEquals(survivingPartitions.size(), expectedSurvivorSize,
+                "Unexpected number of surviving partitions");
+
+            Assert.assertEquals(expectedInstancesEvicted.size(), filteredPartitions.size(),
+                "Unexpected number of evicted partitions");
+
+            final String actualInstancesEvicted = readLogFile(new Path(logFile));
+            validateInstancePaths(actualInstancesEvicted, expectedInstancePaths.toString());
+
+            if (isExternal) {
+                verifyMultiColDatedFSPartitionsAreDeleted(candidatePartitions, range.first,
timeZone);
+            }
+        } finally {
+            dropMultiColDatedPartitions(tableName, candidatePartitions);
+            Assert.assertEquals(client.getPartitions(DATABASE_NAME, tableName).size(), 0);
+        }
+    }
+
     public List<String> getCandidatePartitions(String retentionLimit, String dateMask,
                                                String timeZone, int limit) throws Exception
{
         List<String> partitions = new ArrayList<String>();
@@ -244,6 +323,41 @@ public class TableStorageFeedEvictorIT {
         return partitions;
     }
 
+    public List<Map<String, String>> getMultiColDatedCandidatePartitions(String
retentionLimit,
+        String timeZone, int limit) throws Exception {
+        List<Map<String, String>> partitions = new ArrayList<Map<String,
String>>();
+
+        Pair<Date, Date> range = getDateRange(retentionLimit);
+
+        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(range.first);
+        for (int i = 1; i <= limit; i++) {
+            calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+            String[] dateCols = dateFormat.format(calendar.getTime()).split("-");
+            Map<String, String> dateParts = new TreeMap<String, String>();
+            dateParts.put("year", dateCols[0]);
+            dateParts.put("month", dateCols[1]);
+            dateParts.put("day", dateCols[2]);
+            partitions.add(dateParts);
+        }
+
+        calendar.setTime(range.second);
+        for (int i = 1; i <= limit; i++) {
+            calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+            String[] dateCols = dateFormat.format(calendar.getTime()).split("-");
+            Map<String, String> dateParts = new TreeMap<String, String>();
+            dateParts.put("year", dateCols[0]);
+            dateParts.put("month", dateCols[1]);
+            dateParts.put("day", dateCols[2]);
+            partitions.add(dateParts);
+        }
+
+        return partitions;
+    }
+
     private Pair<Date, Date> getDateRange(String period) throws ELException {
         Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
                 Long.class, RESOLVER, RESOLVER);
@@ -271,6 +385,28 @@ public class TableStorageFeedEvictorIT {
         }
     }
 
+    private void addMultiColDatedPartitions(String tableName, List<Map<String, String>>
candidatePartitions,
+        boolean isTableExternal) throws Exception {
+        Path path = new Path(MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION);
+        FileSystem fs = path.getFileSystem(new Configuration());
+
+        for (Map<String, String> candidatePartition : candidatePartitions) {
+            if (isTableExternal) {
+                StringBuilder pathStr = new StringBuilder(MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION);
+                for (Map.Entry<String, String> entry : candidatePartition.entrySet())
{
+                    pathStr.append(entry.getKey()).append("=").append(entry.getValue()).append("/");
+                }
+                pathStr.append("region=in");
+                touch(fs, pathStr.toString());
+            }
+
+            candidatePartition.put("region", "in");
+            HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
+                DATABASE_NAME, tableName, null, candidatePartition).build();
+            client.addPartition(addPtn);
+        }
+    }
+
     private void touch(FileSystem fs, String path) throws Exception {
         fs.create(new Path(path)).close();
     }
@@ -285,6 +421,14 @@ public class TableStorageFeedEvictorIT {
         }
     }
 
+    private void dropMultiColDatedPartitions(String tableName, List<Map<String, String>>
candidatePartitions)
+        throws Exception {
+
+        for (Map<String, String> partition : candidatePartitions) {
+            client.dropPartitions(DATABASE_NAME, tableName, partition, true);
+        }
+    }
+
     private List<HCatPartition> getFilteredPartitions(String tableName, String timeZone,
String dateMask,
                                                       Pair<Date, Date> range) throws
HCatException {
         DateFormat dateFormat = new SimpleDateFormat(dateMask);
@@ -294,6 +438,24 @@ public class TableStorageFeedEvictorIT {
         return client.listPartitionsByFilter(DATABASE_NAME, tableName, filter);
     }
 
+    private List<HCatPartition> getMultiColDatedFilteredPartitions(String tableName,
String timeZone,
+        Pair<Date, Date> range) throws HCatException {
+        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(range.first);
+        String[] dateCols = dateFormat.format(calendar.getTime()).split("-");
+        // filter eg: "(year < '2014') or (year = '2014' and month < '02') or
+        // (year = '2014' and month = '02' and day < '24')"
+        String filter1 = "(year < '" + dateCols[0] + "')";
+        String filter2 = "(year = '" + dateCols[0] + "' and month < '" + dateCols[1] +
"')";
+        String filter3 = "(year = '" + dateCols[0] + "' and month = '" + dateCols[1]
+            + "' and day < '" + dateCols[2] + "')";
+        String filter = filter1 + " or " + filter2 + " or " + filter3;
+        return client.listPartitionsByFilter(DATABASE_NAME, tableName, filter);
+    }
+
     public List<String> getExpectedEvictedInstances(List<String> candidatePartitions,
Date date, String dateMask,
                                                     String timeZone, StringBuilder instancePaths)
{
         Collections.sort(candidatePartitions);
@@ -307,9 +469,29 @@ public class TableStorageFeedEvictorIT {
         for (String candidatePartition : candidatePartitions) {
             if (candidatePartition.compareTo(startDate) < 0) {
                 expectedInstances.add(candidatePartition);
+                instancePaths.append("[").append(candidatePartition).append("; in],");
+            }
+        }
+
+        return expectedInstances;
+    }
+
+    public List<Map<String, String>> getMultiColDatedExpectedEvictedInstances(List<Map<String,
String>>
+            candidatePartitions, Date date, String timeZone, StringBuilder instancePaths)
throws Exception {
+        instancePaths.append("instancePaths=");
+
+        DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+        String startDate = dateFormat.format(date);
+
+        List<Map<String, String>> expectedInstances = new ArrayList<Map<String,
String>>();
+        for (Map<String, String> partition : candidatePartitions) {
+            String partDate = partition.get("year") + partition.get("month") + partition.get("day");
+            if (partDate.compareTo(startDate) < 0) {
+                expectedInstances.add(partition);
                 instancePaths.append("[")
-                        .append(candidatePartition)
-                        .append(", in],");
+                    .append(partition.values().toString().replace("," , ";"))
+                    .append("; in],");
             }
         }
 
@@ -335,6 +517,25 @@ public class TableStorageFeedEvictorIT {
         }
     }
 
+    private void verifyMultiColDatedFSPartitionsAreDeleted(List<Map<String, String>>
candidatePartitions,
+        Date date, String timeZone) throws Exception {
+
+        FileSystem fs = new Path(MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION).getFileSystem(new
Configuration());
+
+        DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+        String startDate = dateFormat.format(date);
+
+        for (Map<String, String> partition : candidatePartitions) {
+            String partDate = partition.get("year") + partition.get("month") + partition.get("day");
+            final String path = MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION
+                + partition.get("year") + "/" + partition.get("month") + "/" + partition.get("day")
+ "/region=in";
+            if (partDate.compareTo(startDate) < 0 && fs.exists(new Path(path)))
{
+                Assert.fail("Expecting " + path + " to be deleted");
+            }
+        }
+    }
+
     private String readLogFile(Path logFile) throws IOException {
         ByteArrayOutputStream writer = new ByteArrayOutputStream();
         InputStream date = logFile.getFileSystem(new Configuration()).open(logFile);
@@ -342,6 +543,25 @@ public class TableStorageFeedEvictorIT {
         return writer.toString();
     }
 
+    // instance paths could be deleted in any order; compare the list of evicted paths
+    private void validateInstancePaths(String actualInstancesEvicted, String expectedInstancePaths)
{
+        String[] actualEvictedPathStr = actualInstancesEvicted.split("=");
+        String[] expectedEvictedPathStr = expectedInstancePaths.split("=");
+        if (actualEvictedPathStr.length == 1) {
+            Assert.assertEquals(expectedEvictedPathStr.length, 1);
+        } else {
+            Assert.assertEquals(actualEvictedPathStr.length, 2);
+            Assert.assertEquals(expectedEvictedPathStr.length, 2);
+
+            String[] actualEvictedPaths = actualEvictedPathStr[1].split(",");
+            String[] expectedEvictedPaths = actualEvictedPathStr[1].split(",");
+            Arrays.sort(actualEvictedPaths);
+            Arrays.sort(expectedEvictedPaths);
+            Assert.assertEquals(actualEvictedPaths, expectedEvictedPaths,
+                "Unexpected number of Logged partitions");
+        }
+    }
+
     private static class InMemoryWriter extends PrintStream {
 
         private final StringBuffer buffer = new StringBuffer();


Mime
View raw message