falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suh...@apache.org
Subject [2/2] falcon git commit: FALCON-1091 Monitoring plugin that registers catalog partition - code. Contributed by Suhas Vasu / PallaviRao / Shwetha GS
Date Thu, 02 Apr 2015 11:25:20 GMT
FALCON-1091 Monitoring plugin that registers catalog partition - code. Contributed by Suhas Vasu / PallaviRao / Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4b0a920f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4b0a920f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4b0a920f

Branch: refs/heads/master
Commit: 4b0a920f6b6336e2bf4926adc8dc329f88f556e2
Parents: 13bc6b6
Author: Suhas V <suhas.v@inmobi.com>
Authored: Thu Apr 2 16:54:40 2015 +0530
Committer: Suhas V <suhas.v@inmobi.com>
Committed: Thu Apr 2 16:54:40 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../falcon/catalog/AbstractCatalogService.java  |  42 +++
 .../falcon/catalog/CatalogPartitionHandler.java | 298 +++++++++++++++++++
 .../falcon/catalog/CatalogServiceFactory.java   |  10 +-
 .../falcon/catalog/HiveCatalogService.java      | 107 ++++++-
 .../apache/falcon/entity/CatalogStorage.java    |  87 ++----
 .../org/apache/falcon/entity/FeedHelper.java    | 135 ++++-----
 .../apache/falcon/entity/FileSystemStorage.java |  29 +-
 .../falcon/entity/common/FeedDataPath.java      |  53 ++--
 .../falcon/expression/ExpressionHelper.java     |  11 +-
 .../apache/falcon/util/FalconRadixUtils.java    |  16 +-
 .../workflow/WorkflowExecutionContext.java      |   2 +-
 common/src/main/resources/startup.properties    |  13 +
 .../apache/falcon/entity/FeedDataPathTest.java  |  10 +-
 .../apache/falcon/entity/FeedHelperTest.java    |  54 ++++
 .../falcon/entity/FileSystemStorageTest.java    |   3 +-
 docs/src/site/twiki/InstallationSteps.twiki     |  21 ++
 .../mapred/ClassicClientProtocolProvider.java   |  21 +-
 .../org/apache/falcon/logging/LogProvider.java  |   3 +-
 .../ProcessExecutionCoordinatorBuilder.java     |   2 +-
 .../workflow/engine/OozieWorkflowEngine.java    |   2 +-
 .../OozieProcessWorkflowBuilderTest.java        |   2 +-
 prism/pom.xml                                   |   6 +-
 .../falcon/retention/FeedEvictorTest.java       |   5 +-
 .../src/main/resources/mapred-site.xml          |   4 +
 .../src/main/resources/yarn-site.xml            |   5 -
 .../catalog/CatalogPartitionHandlerIT.java      |  79 +++++
 .../falcon/catalog/HiveCatalogServiceIT.java    |  61 +++-
 .../lifecycle/TableStorageFeedEvictorIT.java    |  16 +-
 .../org/apache/falcon/util/HiveTestUtils.java   |   9 +
 .../org/apache/falcon/util/OozieTestUtils.java  |  24 +-
 webapp/src/test/resources/cluster-template.xml  |   2 +-
 webapp/src/test/resources/feed-template1.xml    |   3 +-
 webapp/src/test/resources/feed-template2.xml    |   7 +-
 34 files changed, 901 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c52cc3..399e401 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1091 Monitoring plugin that registers catalog partition - code
+   (Suhas Vasu / PallaviRao / Shwetha GS via Suhas Vasu)
+
    FALCON-790 Falcon UI to enable entity/process/feed edits and 
    management. (Armando Reyna/Kenneth Ho via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 9abdc93..41d50df 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -65,6 +65,10 @@ public abstract class AbstractCatalogService {
     public abstract boolean isTableExternal(Configuration conf, String catalogUrl, String database,
                                             String tableName) throws FalconException;
 
+    public abstract List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl,
+                                                          String database, String tableName,
+                                                          List<String> values) throws FalconException;
+
     /**
      * List partitions by filter. Executed in the workflow engine.
      *
@@ -132,4 +136,42 @@ public abstract class AbstractCatalogService {
                                                   String database, String tableName,
                                                   List<String> partitionValues)
         throws FalconException;
+
+    /**
+     * Gets the partition columns for the table in catalog service.
+     * @param conf
+     * @param catalogUrl url for the catalog service
+     * @param database
+     * @param tableName
+     * @return ordered list of partition columns for the table
+     * @throws FalconException
+     */
+    public abstract List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database,
+                                                     String tableName) throws FalconException;
+
+    /**
+     * Adds the partition to the table.
+     * @param conf
+     * @param catalogUrl
+     * @param database
+     * @param tableName
+     * @param values
+     * @param location
+     * @throws FalconException
+     */
+    public abstract void addPartition(Configuration conf, String catalogUrl, String database,
+                                      String tableName, List<String> values, String location) throws FalconException;
+
+    /**
+     * Updates an existing partition in the table.
+     * @param conf
+     * @param catalogUrl
+     * @param database
+     * @param tableName
+     * @param partValues
+     * @param location
+     * @throws FalconException
+     */
+    public abstract void updatePartition(Configuration conf, String catalogUrl, String database, String tableName,
+                                         List<String> partValues, String location) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
new file mode 100644
index 0000000..f8a3d46
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Arrays;
+import java.util.TimeZone;
+import java.util.Properties;
+
+/**
+ * Listens to workflow execution completion events.
+ * It syncs HCat partitions based on the feeds created/evicted/replicated.
+ */
+public class CatalogPartitionHandler implements WorkflowExecutionListener{
+    private static final Logger LOG = LoggerFactory.getLogger(CatalogPartitionHandler.class);
+
+    public static final ConfigurationStore STORE = ConfigurationStore.get();
+    public static final String CATALOG_TABLE = "catalog.table";
+    private ExpressionHelper evaluator = ExpressionHelper.get();
+    private static CatalogPartitionHandler catalogInstance = new CatalogPartitionHandler();
+    private static final boolean IS_CATALOG_ENABLED = CatalogServiceFactory.isEnabled();
+    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    private static final PathFilter PATH_FILTER = new PathFilter() {
+        @Override public boolean accept(Path path) {
+            try {
+                FileSystem fs = path.getFileSystem(new Configuration());
+                return !path.getName().startsWith("_") && !path.getName().startsWith(".") && !fs.isFile(path);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    };
+
+    public static final CatalogPartitionHandler get() {
+        return catalogInstance;
+    }
+
+    @Override
+    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+        if (!IS_CATALOG_ENABLED) {
+            //Skip if catalog service is not enabled
+            return;
+        }
+
+        String[] feedNames = context.getOutputFeedNamesList();
+        String[] feedPaths = context.getOutputFeedInstancePathsList();
+        Cluster cluster = STORE.get(EntityType.CLUSTER, context.getClusterName());
+        Configuration clusterConf = ClusterHelper.getConfiguration(cluster);
+
+        if (StringUtils.isEmpty(ClusterHelper.getRegistryEndPoint(cluster))) {
+            //Skip if registry endpoint is not defined for the cluster
+            LOG.info("Catalog endpoint not defined for cluster {}. Skipping partition registration", cluster.getName());
+            return;
+        }
+
+        for (int index = 0; index < feedNames.length; index++) {
+            LOG.info("Partition handling for feed {} for path {}", feedNames[index], feedPaths[index]);
+            Feed feed = STORE.get(EntityType.FEED, feedNames[index]);
+
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+            if (storage.getType() == Storage.TYPE.TABLE) {
+                //Do nothing if the feed is already table based
+                LOG.info("Feed {} is already table based. Skipping partition registration", feed.getName());
+                continue;
+            }
+
+            CatalogStorage catalogStorage = getCatalogStorageFromFeedProperties(feed, cluster, clusterConf);
+            if (catalogStorage == null) {
+                //There is no catalog defined in the feed properties. So, skip partition registration
+                LOG.info("Feed {} doesn't have table defined in its properties/table doesn't exist. "
+                        + "Skipping partition registration", feed.getName());
+                continue;
+            }
+
+            //Generate static partition values - get the date from feed path and evaluate partitions in catalog spec
+            Path feedPath = new Path(new Path(feedPaths[index]).toUri().getPath());
+
+            String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
+            LOG.debug("Template {} catalogInstance path {}", templatePath, feedPath);
+            Date date = FeedHelper.getDate(templatePath, feedPath, UTC);
+            if (date == null) {
+                LOG.info("Feed {} catalogInstance path {} doesn't match the template {}. "
+                                + "Skipping partition registration",
+                        feed.getName(), feedPath, templatePath);
+                continue;
+            }
+
+            LOG.debug("Reference date from path {} is {}", feedPath, SchemaHelper.formatDateUTC(date));
+            ExpressionHelper.setReferenceDate(date);
+            List<String> partitionValues = new ArrayList<String>();
+            for (Map.Entry<String, String> entry : catalogStorage.getPartitions().entrySet()) {
+                LOG.debug("Evaluating partition {}", entry.getValue());
+                partitionValues.add(evaluator.evaluateFullExpression(entry.getValue(), String.class));
+            }
+
+            LOG.debug("Static partition - {}", partitionValues);
+            WorkflowExecutionContext.EntityOperations operation = context.getOperation();
+            switch (operation) {
+            case DELETE:
+                dropPartitions(clusterConf, catalogStorage, partitionValues);
+                break;
+
+            case GENERATE:
+            case REPLICATE:
+                registerPartitions(clusterConf, catalogStorage, feedPath, partitionValues);
+                break;
+
+            default:
+                throw new FalconException("Unhandled operation " + operation);
+            }
+        }
+    }
+
+    //Register additional partitions. Compare the expected partitions and the existing partitions
+    //1.exist (intersection) expected --> partition already exists, so update partition
+    //2.exist - expected --> partition is not required anymore, so drop partition
+    //3.expected - exist --> partition doesn't exist, so add partition
+    private void registerPartitions(Configuration conf, CatalogStorage storage, Path staticPath,
+                                    List<String> staticPartition) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+            if (!fs.exists(staticPath)) {
+                //Do nothing if the output path doesn't exist
+                return;
+            }
+
+            List<String> partitionColumns = getPartitionColumns(conf, storage);
+            int dynamicPartCols = partitionColumns.size() - staticPartition.size();
+            Path searchPath = staticPath;
+            if (dynamicPartCols > 0) {
+                searchPath = new Path(staticPath, StringUtils.repeat("*", "/", dynamicPartCols));
+            }
+
+            //Figure out the dynamic partitions from the directories on hdfs
+            FileStatus[] files = fs.globStatus(searchPath, PATH_FILTER);
+            Map<List<String>, String> partitions = new HashMap<List<String>, String>();
+            for (FileStatus file : files) {
+                List<String> dynamicParts = getDynamicPartitions(file.getPath(), staticPath);
+                List<String> partitionValues = new ArrayList<String>(staticPartition);
+                partitionValues.addAll(dynamicParts);
+                LOG.debug("Final partition - " + partitionValues);
+                partitions.put(partitionValues, file.getPath().toString());
+            }
+
+            List<List<String>> existPartitions = listPartitions(conf, storage, staticPartition);
+            Collection<List<String>> targetPartitions = partitions.keySet();
+
+            Collection<List<String>> partitionsForDrop = CollectionUtils.subtract(existPartitions, targetPartitions);
+            Collection<List<String>> partitionsForAdd = CollectionUtils.subtract(targetPartitions, existPartitions);
+            Collection<List<String>> partitionsForUpdate =
+                    CollectionUtils.intersection(existPartitions, targetPartitions);
+
+            for (List<String> partition : partitionsForDrop) {
+                dropPartitions(conf, storage, partition);
+            }
+
+            for (List<String> partition : partitionsForAdd) {
+                addPartition(conf, storage, partition, partitions.get(partition));
+            }
+
+            for (List<String> partition : partitionsForUpdate) {
+                updatePartition(conf, storage, partition, partitions.get(partition));
+            }
+        } catch(IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    private void updatePartition(Configuration conf, CatalogStorage storage, List<String> partition, String location)
+        throws FalconException {
+        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+        catalogService.updatePartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(),
+                partition, location);
+    }
+
+    private void addPartition(Configuration conf, CatalogStorage storage, List<String> partition, String location)
+        throws FalconException {
+        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+        catalogService.addPartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partition,
+                location);
+    }
+
+    private List<List<String>> listPartitions(Configuration conf, CatalogStorage storage, List<String> staticPartitions)
+        throws FalconException {
+        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+        List<CatalogPartition> partitions = catalogService.listPartitions(conf, storage.getCatalogUrl(),
+                storage.getDatabase(), storage.getTable(), staticPartitions);
+        List<List<String>> existPartitions = new ArrayList<List<String>>();
+        for (CatalogPartition partition : partitions) {
+            existPartitions.add(partition.getValues());
+        }
+        return existPartitions;
+    }
+
+    //Returns the dynamic partitions of the data path
+    protected List<String> getDynamicPartitions(Path path, Path staticPath) {
+        String dynPart = path.toUri().getPath().substring(staticPath.toString().length());
+        dynPart = StringUtils.removeStart(dynPart, "/");
+        dynPart = StringUtils.removeEnd(dynPart, "/");
+        if (StringUtils.isEmpty(dynPart)) {
+            return new ArrayList<String>();
+        }
+        return Arrays.asList(dynPart.split("/"));
+    }
+
+    private List<String> getPartitionColumns(Configuration conf, CatalogStorage storage) throws FalconException {
+        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+        return catalogService.getPartitionColumns(conf, storage.getCatalogUrl(), storage.getDatabase(),
+                storage.getTable());
+    }
+
+    private void dropPartitions(Configuration conf, CatalogStorage storage, List<String> values)
+        throws FalconException {
+        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+        catalogService.dropPartitions(conf, storage.getCatalogUrl(), storage.getDatabase(),
+                storage.getTable(), values, false);
+    }
+
+    //Get the catalog template from feed properties as feed is filesystem based
+    protected CatalogStorage getCatalogStorageFromFeedProperties(Feed feed, Cluster cluster, Configuration conf)
+        throws FalconException {
+        Properties properties = FeedHelper.getFeedProperties(feed);
+        String tableUri = properties.getProperty(CATALOG_TABLE);
+        if (tableUri == null) {
+            return null;
+        }
+
+        CatalogTable table = new CatalogTable();
+        table.setUri(tableUri.replace("{", "${"));
+        CatalogStorage storage = null;
+        try {
+            storage = new CatalogStorage(cluster, table);
+        } catch (URISyntaxException e) {
+            throw new FalconException(e);
+        }
+
+        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+        if (!catalogService.tableExists(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable())) {
+            return null;
+        }
+        return storage;
+    }
+
+    @Override
+    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+        //no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
index c8a0fa0..77e6851 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
@@ -22,12 +22,16 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Factory for providing appropriate catalog service
  * implementation to the falcon service.
  */
 @SuppressWarnings("unchecked")
 public final class CatalogServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(CatalogServiceFactory.class);
 
     public static final String CATALOG_SERVICE = "catalog.service.impl";
 
@@ -35,7 +39,11 @@ public final class CatalogServiceFactory {
     }
 
     public static boolean isEnabled() {
-        return StartupProperties.get().containsKey(CATALOG_SERVICE);
+        boolean isEnabled = StartupProperties.get().containsKey(CATALOG_SERVICE);
+        if (!isEnabled) {
+            LOG.info("Catalog service disabled. Partitions will not registered");
+        }
+        return isEnabled;
     }
 
     public static AbstractCatalogService getCatalogService() throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 25a4a46..3d57631 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -21,14 +21,11 @@ package org.apache.falcon.catalog;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.Credentials;
@@ -44,6 +41,7 @@ import java.io.File;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -53,6 +51,8 @@ import java.util.List;
 public class HiveCatalogService extends AbstractCatalogService {
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
+    public static final String CREATE_TIME = "falcon.create_time";
+    public static final String UPDATE_TIME = "falcon.update_time";
 
 
     public static HiveConf createHiveConf(Configuration conf,
@@ -97,8 +97,6 @@ public class HiveCatalogService extends AbstractCatalogService {
                 ugi.addCredentials(credentials); // credentials cannot be null
             }
 
-            OozieActionConfigurationHelper.dumpConf(hcatConf, "hive conf ");
-
             return new HiveMetaStoreClient(hcatConf);
         } catch (Exception e) {
             throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e);
@@ -176,7 +174,7 @@ public class HiveCatalogService extends AbstractCatalogService {
                                                                        String metaStoreServicePrincipal)
         throws IOException {
 
-        LOG.info("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
+        LOG.debug("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
         HCatClient hcatClient = HCatClient.create(hcatConf);
         String delegationToken = hcatClient.getDelegationToken(
                 CurrentUser.getUser(), metaStoreServicePrincipal);
@@ -211,6 +209,8 @@ public class HiveCatalogService extends AbstractCatalogService {
             HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
             Table table = client.getTable(database, tableName);
             return table != null;
+        } catch (NoSuchObjectException e) {
+            return false;
         } catch (Exception e) {
             throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
         }
@@ -231,6 +231,29 @@ public class HiveCatalogService extends AbstractCatalogService {
     }
 
     @Override
+    public List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl,
+                                                         String database, String tableName,
+                                                         List<String> values) throws FalconException {
+        LOG.info("List partitions for: {}, partition filter: {}", tableName, values);
+
+        try {
+            List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
+
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            List<Partition> hCatPartitions = client.listPartitions(database, tableName, values, (short) -1);
+            for (Partition hCatPartition : hCatPartitions) {
+                LOG.debug("Partition: " + hCatPartition.getValues());
+                CatalogPartition partition = createCatalogPartition(hCatPartition);
+                catalogPartitionList.add(partition);
+            }
+
+            return catalogPartitionList;
+        } catch (Exception e) {
+            throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
+        }
+    }
+
+    @Override
     public List<CatalogPartition> listPartitionsByFilter(Configuration conf, String catalogUrl,
                                                          String database, String tableName,
                                                          String filter) throws FalconException {
@@ -267,6 +290,7 @@ public class HiveCatalogService extends AbstractCatalogService {
         return catalogPartition;
     }
 
+    //Drop single partition
     @Override
     public boolean dropPartition(Configuration conf, String catalogUrl,
                                   String database, String tableName,
@@ -313,4 +337,73 @@ public class HiveCatalogService extends AbstractCatalogService {
             throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
         }
     }
+
+    @Override
+    public List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database,
+                                            String tableName) throws FalconException {
+        LOG.info("Fetching partition columns of table: " + tableName);
+
+        try {
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            Table table = client.getTable(database, tableName);
+            List<String> partCols = new ArrayList<String>();
+            for (FieldSchema part : table.getPartitionKeys()) {
+                partCols.add(part.getName());
+            }
+            return partCols;
+        } catch (Exception e) {
+            throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void addPartition(Configuration conf, String catalogUrl, String database,
+                             String tableName, List<String> partValues, String location) throws FalconException {
+        LOG.info("Adding partition {} for {}.{} with location {}", partValues, database, tableName, location);
+
+        try {
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            Table table = client.getTable(database, tableName);
+            org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
+            part.setDbName(database);
+            part.setTableName(tableName);
+            part.setValues(partValues);
+            part.setSd(table.getSd());
+            part.getSd().setLocation(location);
+            part.setParameters(table.getParameters());
+            if (part.getParameters() == null) {
+                part.setParameters(new HashMap<String, String>());
+            }
+            part.getParameters().put(CREATE_TIME, String.valueOf(System.currentTimeMillis()));
+            client.add_partition(part);
+
+        } catch (Exception e) {
+            throw new FalconException("Exception adding partition: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void updatePartition(Configuration conf, String catalogUrl, String database,
+                             String tableName, List<String> partValues, String location) throws FalconException {
+        LOG.info("Updating partition {} of {}.{} with location {}", partValues, database, tableName, location);
+
+        try {
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            Table table = client.getTable(database, tableName);
+            org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
+            part.setDbName(database);
+            part.setTableName(tableName);
+            part.setValues(partValues);
+            part.setSd(table.getSd());
+            part.getSd().setLocation(location);
+            part.setParameters(table.getParameters());
+            if (part.getParameters() == null) {
+                part.setParameters(new HashMap<String, String>());
+            }
+            part.getParameters().put(UPDATE_TIME, String.valueOf(System.currentTimeMillis()));
+            client.alter_partition(database, tableName, part);
+        } catch (Exception e) {
+            throw new FalconException("Exception updating partition: " + e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 59f558b..7930fba 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.retention.EvictionHelper;
@@ -43,15 +44,11 @@ import javax.servlet.jsp.el.ELException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
 import java.util.regex.Matcher;
 
 /**
@@ -90,7 +87,7 @@ public class CatalogStorage extends Configured implements Storage {
         this(CATALOG_URL, feed.getTable());
     }
 
-    protected CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
+    public CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
         this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table);
     }
 
@@ -397,10 +394,7 @@ public class CatalogStorage extends Configured implements Storage {
         List<CatalogPartition> toBeDeleted;
         try {
             // get sorted date partition keys and values
-            List<String> datedPartKeys = new ArrayList<String>();
-            List<String> datedPartValues = new ArrayList<String>();
-            fillSortedDatedPartitionKVs(datedPartKeys, datedPartValues, retentionLimit, timeZone);
-            toBeDeleted = discoverPartitionsToDelete(datedPartKeys, datedPartValues);
+            toBeDeleted = discoverPartitionsToDelete(retentionLimit, timeZone);
         } catch (ELException e) {
             throw new FalconException("Couldn't find partitions to be deleted", e);
 
@@ -428,58 +422,30 @@ public class CatalogStorage extends Configured implements Storage {
         return instanceDates;
     }
 
-    private List<CatalogPartition> discoverPartitionsToDelete(List<String> datedPartKeys, List<String> datedPartValues)
+    private List<CatalogPartition> discoverPartitionsToDelete(String retentionLimit, String timezone)
         throws FalconException, ELException {
-
-        final String filter = createFilter(datedPartKeys, datedPartValues);
-        return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
-            getConf(), getCatalogUrl(), getDatabase(), getTable(), filter);
-    }
-
-    private void fillSortedDatedPartitionKVs(List<String> sortedPartKeys, List<String> sortedPartValues,
-                                             String retentionLimit, String timeZone) throws ELException {
         Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
-
-        // sort partition keys and values by the date pattern present in value
-        Map<FeedDataPath.VARS, String> sortedPartKeyMap = new TreeMap<FeedDataPath.VARS, String>();
-        Map<FeedDataPath.VARS, String> sortedPartValueMap = new TreeMap<FeedDataPath.VARS, String>();
+        ExpressionHelper.setReferenceDate(range.first);
+        Map<String, String> partitionsToDelete = new LinkedHashMap<String, String>();
+        ExpressionHelper expressionHelper = ExpressionHelper.get();
         for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
-            String datePattern = entry.getValue();
-            String mask = datePattern.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy")
-                    .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM")
-                    .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd")
-                    .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH")
-                    .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm");
-
-            // find the first date pattern present in date mask
-            FeedDataPath.VARS vars = FeedDataPath.VARS.presentIn(mask);
-            // skip this partition if date mask doesn't contain any date format
-            if (vars == null) {
-                continue;
-            }
-
-            // construct dated partition value as per format
-            DateFormat dateFormat = new SimpleDateFormat(mask);
-            dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-            String partitionValue = dateFormat.format(range.first);
-
-            // add partition key and value in their sorted maps
-            if (!sortedPartKeyMap.containsKey(vars)) {
-                sortedPartKeyMap.put(vars, entry.getKey());
-            }
-
-            if (!sortedPartValueMap.containsKey(vars)) {
-                sortedPartValueMap.put(vars, partitionValue);
+            if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
+                partitionsToDelete.put(entry.getKey(),
+                        expressionHelper.evaluateFullExpression(entry.getValue(), String.class));
             }
         }
-
-        // add map entries to lists of partition keys and values
-        sortedPartKeys.addAll(sortedPartKeyMap.values());
-        sortedPartValues.addAll(sortedPartValueMap.values());
+        final String filter = createFilter(partitionsToDelete);
+        return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
+            getConf(), getCatalogUrl(), getDatabase(), getTable(), filter);
     }
 
-    private String createFilter(List<String> datedPartKeys, List<String> datedPartValues) throws ELException {
-        int numPartitions = datedPartKeys.size();
+    /**
+     * Creates hive partition filter from inputs partition map.
+     * @param partitionsMap - ordered map of partition keys and values
+     * @return partition filter
+     * @throws ELException
+     */
+    private String createFilter(Map<String, String> partitionsMap) throws ELException {
 
         /* Construct filter query string. As an example, suppose the dated partition keys
          * are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10].
@@ -489,23 +455,26 @@ public class CatalogStorage extends Configured implements Storage {
          * or (year = '2014' and month = '02' and day = '24' and hour < '10')"
          */
         StringBuilder filterBuffer = new StringBuilder();
-        for (int curr = 0; curr < numPartitions; curr++) {
+        List<String> keys = new ArrayList<String>(partitionsMap.keySet());
+        for (int curr = 0; curr < partitionsMap.size(); curr++) {
             if (curr > 0) {
                 filterBuffer.append(FILTER_OR);
             }
             filterBuffer.append(FILTER_ST_BRACKET);
             for (int prev = 0; prev < curr; prev++) {
-                filterBuffer.append(datedPartKeys.get(prev))
+                String key = keys.get(prev);
+                filterBuffer.append(key)
                         .append(FILTER_EQUALS)
                         .append(FILTER_QUOTE)
-                        .append(datedPartValues.get(prev))
+                        .append(partitionsMap.get(key))
                         .append(FILTER_QUOTE)
                         .append(FILTER_AND);
             }
-            filterBuffer.append(datedPartKeys.get(curr))
+            String key = keys.get(curr);
+            filterBuffer.append(key)
                     .append(FILTER_LESS_THAN)
                     .append(FILTER_QUOTE)
-                    .append(datedPartValues.get(curr))
+                    .append(partitionsMap.get(key))
                     .append(FILTER_QUOTE)
                     .append(FILTER_END_BRACKET);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index ca31f95..7f9acc9 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -42,18 +42,7 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
+import java.util.*;
 import java.util.regex.Matcher;
 
 /**
@@ -233,10 +222,24 @@ public final class FeedHelper {
             return clusterLocations.getLocations();
         }
 
-        final Locations feedLocations = feed.getLocations();
+        Locations feedLocations = feed.getLocations();
         return feedLocations == null ? null : feedLocations.getLocations();
     }
 
+    public static Location getLocation(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster cluster,
+                                       LocationType type) {
+        List<Location> locations = getLocations(getCluster(feed, cluster.getName()), feed);
+        if (locations != null) {
+            for (Location location : locations) {
+                if (location.getType() == type) {
+                    return location;
+                }
+            }
+        }
+
+        return null;
+    }
+
     public static Sla getSLAs(Cluster cluster, Feed feed) {
         final Sla clusterSla = cluster.getSla();
         if (clusterSla != null) {
@@ -348,89 +351,55 @@ public final class FeedHelper {
     }
 
     /**
-     * Replaces timed variables with corresponding time notations e.g., ${YEAR} with yyyy and so on.
-     * @param templatePath - template feed path
-     * @return time notations
-     */
-    public static String getDateFormatInPath(String templatePath) {
-        String mask = extractDatePartFromPathMask(templatePath, templatePath);
-        //yyyyMMddHHmm
-        return mask.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy")
-            .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM")
-            .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd")
-            .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH")
-            .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm");
-    }
-
-    /**
-     * Extracts the date part of the path and builds a date format mask.
-     * @param mask - Path pattern containing ${YEAR}, ${MONTH}...
-     * @param inPath - Path from which date part need to be extracted
-     * @return - Parts of inPath with non-date-part stripped out.
-     *
-     * Example: extractDatePartFromPathMask("/data/foo/${YEAR}/${MONTH}", "/data/foo/2012/${MONTH}");
-     *     Returns: 2012${MONTH}.
-     */
-    private static String extractDatePartFromPathMask(String mask, String inPath) {
-        String[] elements = FeedDataPath.PATTERN.split(mask);
-
-        String out = inPath;
-        for (String element : elements) {
-            out = out.replaceFirst(element, "");
-        }
-        return out;
-    }
-
-    private static Map<FeedDataPath.VARS, String> getDatePartMap(String path, String mask) {
-        Map<FeedDataPath.VARS, String> map = new TreeMap<FeedDataPath.VARS, String>();
-        Matcher matcher = FeedDataPath.DATE_FIELD_PATTERN.matcher(mask);
-        int start = 0;
-        while (matcher.find(start)) {
-            String subMask = mask.substring(matcher.start(), matcher.end());
-            String subPath = path.substring(matcher.start(), matcher.end());
-            FeedDataPath.VARS var = FeedDataPath.VARS.from(subMask);
-            if (!map.containsKey(var)) {
-                map.put(var, subPath);
-            }
-            start = matcher.start() + 1;
-        }
-        return map;
-    }
-
-    /**
      *  Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
-     * @param file - actual data path
+     * @param instancePath - actual data path
      * @param templatePath - template path from feed definition
-     * @param dateMask - path mask from getDateFormatInPath()
      * @param timeZone
      * @return date corresponding to the path
      */
     //consider just the first occurrence of the pattern
-    public static Date getDate(Path file, String templatePath, String dateMask, String timeZone) {
-        String path = extractDatePartFromPathMask(templatePath, file.toString());
-        Map<FeedDataPath.VARS, String> map = getDatePartMap(path, dateMask);
-
-        if (map.isEmpty()) {
-            return null;
-        }
+    public static Date getDate(String templatePath, Path instancePath, TimeZone timeZone) {
+        String path = instancePath.toString();
+        Matcher matcher = FeedDataPath.PATTERN.matcher(templatePath);
+        Calendar cal = Calendar.getInstance(timeZone);
+        int lastEnd = 0;
+
+        Set<FeedDataPath.VARS> matchedVars = new HashSet<FeedDataPath.VARS>();
+        while (matcher.find()) {
+            FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group());
+            String pad = templatePath.substring(lastEnd, matcher.start());
+            if (!path.startsWith(pad)) {
+                //Template and path do not match
+                return null;
+            }
 
-        StringBuilder date = new StringBuilder();
-        int ordinal = 0;
-        for (Map.Entry<FeedDataPath.VARS, String> entry : map.entrySet()) {
-            if (ordinal++ == entry.getKey().ordinal()) {
-                date.append(entry.getValue());
-            } else {
+            int value;
+            try {
+                value = Integer.valueOf(path.substring(pad.length(), pad.length() + pathVar.getValueSize()));
+            } catch (NumberFormatException e) {
+                //Not a valid number for variable
                 return null;
             }
+
+            pathVar.setCalendar(cal, value);
+            lastEnd = matcher.end();
+            path = path.substring(pad.length() + pathVar.getValueSize());
+            matchedVars.add(pathVar);
         }
 
-        try {
-            DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, date.length()));
-            dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-            return dateFormat.parse(date.toString());
-        } catch (ParseException e) {
+        //Match the remaining constant at the end
+        if (!templatePath.substring(lastEnd).equals(path)) {
             return null;
         }
+
+
+        //Reset other fields
+        for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
+            if (!matchedVars.contains(var)) {
+                cal.set(var.getCalendarField(), 0);
+            }
+        }
+        return cal.getTime();
     }
 
     public static Path getFeedBasePath(String feedPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 1ba7b9d..a5caf8e 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -77,7 +77,7 @@ public class FileSystemStorage extends Configured implements Storage {
     private final String storageUrl;
     private final List<Location> locations;
 
-    protected FileSystemStorage(Feed feed) {
+    public FileSystemStorage(Feed feed) {
         this(FILE_SYSTEM_URL, feed.getLocations());
     }
 
@@ -293,11 +293,11 @@ public class FileSystemStorage extends Configured implements Storage {
     }
 
     @Override
-    public StringBuilder evict(String retentionLimit, String timeZone,
-                               Path logFilePath) throws FalconException {
+    public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException {
+        TimeZone tz = TimeZone.getTimeZone(timeZone);
         try{
             for (Location location : getLocations()) {
-                fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, timeZone, logFilePath);
+                fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, tz, logFilePath);
             }
             EvictedInstanceSerDe.serializeEvictedInstancePaths(
                     HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), getConf()),
@@ -311,7 +311,7 @@ public class FileSystemStorage extends Configured implements Storage {
         return instanceDates;
     }
 
-    private void fileSystemEvictor(String feedPath, String retentionLimit, String timeZone,
+    private void fileSystemEvictor(String feedPath, String retentionLimit, TimeZone timeZone,
                                    Path logFilePath) throws IOException, ELException, FalconException {
         Path normalizedPath = new Path(feedPath);
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri());
@@ -319,28 +319,26 @@ public class FileSystemStorage extends Configured implements Storage {
         LOG.info("Normalized path: {}", feedPath);
 
         Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
-        String dateMask = FeedHelper.getDateFormatInPath(feedPath);
 
-        List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, dateMask, range.first, fs);
+        List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, range.first, fs);
         if (toBeDeleted.isEmpty()) {
             LOG.info("No instances to delete.");
             return;
         }
 
         DateFormat dateFormat = new SimpleDateFormat(FeedHelper.FORMAT);
-        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+        dateFormat.setTimeZone(timeZone);
         Path feedBasePath = FeedHelper.getFeedBasePath(feedPath);
         for (Path path : toBeDeleted) {
             deleteInstance(fs, path, feedBasePath);
-            Date date = FeedHelper.getDate(new Path(path.toUri().getPath()), feedPath, dateMask, timeZone);
+            Date date = FeedHelper.getDate(feedPath, new Path(path.toUri().getPath()), timeZone);
             instanceDates.append(dateFormat.format(date)).append(',');
             instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
         }
     }
 
-    private List<Path> discoverInstanceToDelete(String inPath, String timeZone, String dateMask,
-                                                Date start, FileSystem fs) throws IOException {
-
+    private List<Path> discoverInstanceToDelete(String inPath, TimeZone timeZone, Date start, FileSystem fs)
+        throws IOException {
         FileStatus[] files = findFilesForFeed(fs, inPath);
         if (files == null || files.length == 0) {
             return Collections.emptyList();
@@ -348,8 +346,7 @@ public class FileSystemStorage extends Configured implements Storage {
 
         List<Path> toBeDeleted = new ArrayList<Path>();
         for (FileStatus file : files) {
-            Date date = FeedHelper.getDate(new Path(file.getPath().toUri().getPath()),
-                    inPath, dateMask, timeZone);
+            Date date = FeedHelper.getDate(inPath, new Path(file.getPath().toUri().getPath()), timeZone);
             LOG.debug("Considering {}", file.getPath().toUri().getPath());
             LOG.debug("Date: {}", date);
             if (date != null && !isDateInRange(date, start)) {
@@ -427,8 +424,8 @@ public class FileSystemStorage extends Configured implements Storage {
                 String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties);
                 FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath));
                 FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath);
-                String dateMask = FeedHelper.getDateFormatInPath(basePath);
-                Date date = FeedHelper.getDate(new Path(feedInstancePath), basePath, dateMask, tz.getID());
+
+                Date date = FeedHelper.getDate(basePath, new Path(feedInstancePath), tz);
                 instance.setInstance(SchemaHelper.formatDateUTC(date));
                 if (fileStatus != null) {
                     instance.setCreationTime(fileStatus.getModificationTime());

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
index 6ededbb..afe913d 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.entity.common;
 
+import java.util.Calendar;
 import java.util.regex.Pattern;
 
 /**
@@ -30,43 +31,49 @@ public final class FeedDataPath {
      * Standard variables for feed time components.
      */
     public static enum VARS {
-        YEAR("yyyy", "([0-9]{4})"), MONTH("MM", "(0[1-9]|1[0-2])"), DAY("dd", "(0[1-9]|1[0-9]|2[0-9]|3[0-1])"),
-        HOUR("HH", "([0-1][0-9]|2[0-4])"), MINUTE("mm", "([0-5][0-9]|60)");
+        YEAR("([0-9]{4})", Calendar.YEAR, 4), MONTH("(0[1-9]|1[0-2])", Calendar.MONTH, 2),
+        DAY("(0[1-9]|1[0-9]|2[0-9]|3[0-1])", Calendar.DAY_OF_MONTH, 2),
+        HOUR("([0-1][0-9]|2[0-4])", Calendar.HOUR_OF_DAY, 2), MINUTE("([0-5][0-9]|60)", Calendar.MINUTE, 2);
 
         private final Pattern pattern;
-        private final String datePattern;
-        private final String patternRegularExpression;
+        private final String valuePattern;
+        private final int calendarField;
+        private final int valueSize;
 
-        private VARS(String datePattern, String patternRegularExpression) {
+        private VARS(String patternRegularExpression, int calField, int numDigits) {
             pattern = Pattern.compile("\\$\\{" + name() + "\\}");
-            this.datePattern = datePattern;
-            this.patternRegularExpression = patternRegularExpression;
+            this.valuePattern = patternRegularExpression;
+            this.calendarField = calField;
+            this.valueSize = numDigits;
         }
 
-        public String getPatternRegularExpression() {
-            return patternRegularExpression;
-        }
-
-        public String getDatePattern() {
-            return datePattern;
+        public String getValuePattern() {
+            return valuePattern;
         }
 
         public String regex() {
             return pattern.pattern();
         }
 
-        public static VARS from(String str) {
-            for (VARS var : VARS.values()) {
-                if (var.datePattern.equals(str)) {
-                    return var;
-                }
+        public int getCalendarField() {
+            return calendarField;
+        }
+
+        public int getValueSize() {
+            return valueSize;
+        }
+
+        public void setCalendar(Calendar cal, int value) {
+            if (this == MONTH) {
+                cal.set(calendarField, value - 1);
+            } else {
+                cal.set(calendarField, value);
             }
-            return null;
         }
 
-        public static VARS presentIn(String str) {
+        public static VARS from(String str) {
             for (VARS var : VARS.values()) {
-                if (str.contains(var.datePattern)) {
+                if (var.pattern.matcher(str).matches()) {
                     return var;
                 }
             }
@@ -77,8 +84,4 @@ public final class FeedDataPath {
     public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex()
             + "|" + VARS.MONTH.regex() + "|" + VARS.DAY.regex() + "|"
             + VARS.HOUR.regex() + "|" + VARS.MINUTE.regex());
-
-    public static final Pattern DATE_FIELD_PATTERN = Pattern
-            .compile("yyyy|MM|dd|HH|mm");
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
index 2b50119..65aaeba 100644
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
@@ -21,6 +21,8 @@ package org.apache.falcon.expression;
 import org.apache.commons.el.ExpressionEvaluatorImpl;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.common.FeedDataPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.jsp.el.ELException;
 import javax.servlet.jsp.el.ExpressionEvaluator;
@@ -41,9 +43,10 @@ import java.util.regex.Pattern;
  */
 public final class ExpressionHelper implements FunctionMapper, VariableResolver {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ExpressionHelper.class);
     private static final ExpressionHelper INSTANCE = new ExpressionHelper();
 
-    private ThreadLocal<Properties> threadVariables = new ThreadLocal<Properties>();
+    private static final ThreadLocal<Properties> THREAD_VARIABLES = new ThreadLocal<Properties>();
 
     private static final Pattern SYS_PROPERTY_PATTERN = Pattern.compile("\\$\\{[A-Za-z0-9_.]+\\}");
 
@@ -94,18 +97,20 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
     }
 
     public void setPropertiesForVariable(Properties properties) {
-        threadVariables.set(properties);
+        THREAD_VARIABLES.set(properties);
     }
 
     @Override
     public Object resolveVariable(String field) {
-        return threadVariables.get().get(field);
+        return THREAD_VARIABLES.get().get(field);
     }
 
     private static ThreadLocal<Date> referenceDate = new ThreadLocal<Date>();
 
     public static void setReferenceDate(Date date) {
         referenceDate.set(date);
+        Properties variables = getTimeVariables(date, TimeZone.getTimeZone("UTC"));
+        THREAD_VARIABLES.set(variables);
     }
 
     public static Properties getTimeVariables(Date date, TimeZone tz) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
index 4bf6e00..573180a 100644
--- a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
+++ b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.util;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.common.FeedDataPath;
 
@@ -192,7 +193,7 @@ public class FalconRadixUtils {
                     String regex = key.substring(0, key.indexOf("}") + 1);
                     // match the text and the regex
                     FeedDataPath.VARS var = getMatchingRegex(regex);
-                    if (matchPart(regex, input.substring(0, var.getDatePattern().length()))) {
+                    if (matchPart(regex, input.substring(0, var.getValueSize()))) {
                         newRoot = child; // if it matches then this is the newRoot
                         break;
                     }
@@ -214,9 +215,13 @@ public class FalconRadixUtils {
             if (StringUtils.isBlank(templateString)) {
                 return 0;
             }
+
+            // Since we are only interested in the length, can replace pattern with a random string
             for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
-                templateString = templateString.replace("${" + var.name() + "}", var.getDatePattern());
+                templateString = templateString.replace("${" + var.name() + "}",
+                        RandomStringUtils.random(var.getValueSize()));
             }
+
             return templateString.length();
         }
 
@@ -246,11 +251,12 @@ public class FalconRadixUtils {
 
         private FeedDataPath.VARS getMatchingRegex(String inputPart) {
             //inputPart will be something like ${YEAR}
+
             inputPart = inputPart.replace("${", "\\$\\{");
             inputPart = inputPart.replace("}", "\\}");
 
             for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
-                if (StringUtils.equals(inputPart, var.regex())) {
+                if (inputPart.equals("${" + var.name() + "}")) {
                     return var;
                 }
             }
@@ -298,8 +304,8 @@ public class FalconRadixUtils {
                 for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {//find which regex is this
                     if (StringUtils.equals(var.regex(), template)) {// regex found, do matching
                         //find part of the input string which should be matched against regex
-                        String desiredPart = input.substring(0, var.getDatePattern().length());
-                        Pattern pattern = Pattern.compile(var.getPatternRegularExpression());
+                        String desiredPart = input.substring(0, var.getValueSize());
+                        Pattern pattern = Pattern.compile(var.getValuePattern());
                         Matcher matcher = pattern.matcher(desiredPart);
                         if (!matcher.matches()) {
                             return false;

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 8d69b9a..887cea2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -96,7 +96,7 @@ public class WorkflowExecutionContext {
     private final Map<WorkflowExecutionArgs, String> context;
     private final long creationTime;
 
-    protected WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) {
+    public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) {
         this.context = context;
         creationTime = System.currentTimeMillis();
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 99dab59..4f41548 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -64,6 +64,8 @@
 *.system.lib.location=${FALCON_HOME}/sharedlibs
 
 # Location to store user entity configurations
+
+#Configurations used in UTs
 debug.config.store.uri=file://${user.dir}/target/store
 debug.config.oozie.conf.uri=${user.dir}/target/oozie
 debug.system.lib.location=${system.lib.location}
@@ -73,6 +75,17 @@ debug.libext.feed.retention.paths=${falcon.libext}
 debug.libext.feed.replication.paths=${falcon.libext}
 debug.libext.process.paths=${falcon.libext}
 
+#Configurations used in ITs
+it.config.store.uri=file://${user.dir}/target/store
+it.config.oozie.conf.uri=${user.dir}/target/oozie
+it.system.lib.location=${system.lib.location}
+it.broker.url=tcp://localhost:61616
+it.retry.recorder.path=${user.dir}/target/retry
+it.libext.feed.retention.paths=${falcon.libext}
+it.libext.feed.replication.paths=${falcon.libext}
+it.libext.process.paths=${falcon.libext}
+it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
+
 *.falcon.cleanup.service.frequency=minutes(5)
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
index c405556..4c293bb 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
@@ -29,7 +29,7 @@ public class FeedDataPathTest {
 
     @Test
     public void testMinutesRegularExpression() {
-        String monthPattern = FeedDataPath.VARS.MINUTE.getPatternRegularExpression();
+        String monthPattern = FeedDataPath.VARS.MINUTE.getValuePattern();
         Assert.assertFalse("0".matches(monthPattern));
         Assert.assertFalse("1".matches(monthPattern));
         Assert.assertFalse("61".matches(monthPattern));
@@ -45,7 +45,7 @@ public class FeedDataPathTest {
 
     @Test
     public void testHourRegularExpression() {
-        String hourPattern = FeedDataPath.VARS.HOUR.getPatternRegularExpression();
+        String hourPattern = FeedDataPath.VARS.HOUR.getValuePattern();
         Assert.assertFalse("0".matches(hourPattern));
         Assert.assertFalse("1".matches(hourPattern));
         Assert.assertFalse("2".matches(hourPattern));
@@ -67,7 +67,7 @@ public class FeedDataPathTest {
 
     @Test
     public void testDayRegularExpression() {
-        String dayPattern = FeedDataPath.VARS.DAY.getPatternRegularExpression();
+        String dayPattern = FeedDataPath.VARS.DAY.getValuePattern();
         Assert.assertFalse("0".matches(dayPattern));
         Assert.assertFalse("1".matches(dayPattern));
         Assert.assertFalse("32".matches(dayPattern));
@@ -86,7 +86,7 @@ public class FeedDataPathTest {
 
     @Test
     public void testMonthRegularExpression() {
-        String monthPattern = FeedDataPath.VARS.MONTH.getPatternRegularExpression();
+        String monthPattern = FeedDataPath.VARS.MONTH.getValuePattern();
         Assert.assertFalse("0".matches(monthPattern));
         Assert.assertFalse("1".matches(monthPattern));
         Assert.assertFalse("13".matches(monthPattern));
@@ -105,7 +105,7 @@ public class FeedDataPathTest {
 
     @Test
     public void testYearRegularExpression() {
-        String monthPattern = FeedDataPath.VARS.YEAR.getPatternRegularExpression();
+        String monthPattern = FeedDataPath.VARS.YEAR.getValuePattern();
         Assert.assertFalse("0".matches(monthPattern));
         Assert.assertFalse("1".matches(monthPattern));
         Assert.assertFalse("13".matches(monthPattern));

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index f6994fc..63ab7da 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -18,16 +18,25 @@
 
 package org.apache.falcon.entity;
 
+import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Properties;
 import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.feed.*;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.util.Date;
+import java.util.TimeZone;
+
 /**
  * Test for feed helper methods.
  */
 public class FeedHelperTest {
+    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
     @Test
     public void testPartitionExpression() {
         Assert.assertEquals(FeedHelper.normalizePartitionExpression(" /a// ", "  /b// "), "a/b");
@@ -51,4 +60,49 @@ public class FeedHelperTest {
                 "name/*/pvalue");
         Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "IN"), "IN");
     }
+
+    @DataProvider(name = "fsPathsforDate")
+    public Object[][] createPathsForGetDate() {
+        return new Object[][] {
+            {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015/01/01/00/30", "2015-01-01T00:30Z"},
+            {"/data/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}", "/data/2015-01-01-01-00", "2015-01-01T01:00Z"},
+            {"/data/${YEAR}/${MONTH}/${DAY}", "/data/2015/01/01", "2015-01-01T00:00Z"},
+            {"/data/${YEAR}/${MONTH}/${DAY}/data", "/data/2015/01/01/data", "2015-01-01T00:00Z"},
+            {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015-01-01/00/30", null},
+            {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data", "/data/2015-01-01/00/30", null},
+        };
+    }
+
+    @Test(dataProvider = "fsPathsforDate")
+    public void testGetDateFromPath(String template, String path, String expectedDate) throws Exception {
+        Date date = FeedHelper.getDate(template, new Path(path), UTC);
+        Assert.assertEquals(SchemaHelper.formatDateUTC(date), expectedDate);
+    }
+
+    @Test
+    public void testGetLocations() {
+        Cluster cluster = new Cluster();
+        cluster.setName("name");
+        Feed feed = new Feed();
+        Location location1 = new Location();
+        location1.setType(LocationType.META);
+        Locations locations = new Locations();
+        locations.getLocations().add(location1);
+
+        Location location2 = new Location();
+        location2.setType(LocationType.DATA);
+        locations.getLocations().add(location2);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        feedCluster.setName("name");
+
+        feed.setLocations(locations);
+        Clusters clusters = new Clusters();
+        feed.setClusters(clusters);
+        feed.getClusters().getClusters().add(feedCluster);
+
+        Assert.assertEquals(FeedHelper.getLocations(feedCluster, feed),
+                locations.getLocations());
+        Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 1667161..8b81a29 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -458,8 +458,7 @@ public class FileSystemStorageTest {
             instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
             instance.setSize(-1);
             instance.setCreationTime(0);
-            String dateMask = FeedHelper.getDateFormatInPath(basePath);
-            Date date = FeedHelper.getDate(new Path(path), basePath, dateMask, tz.getID());
+            Date date = FeedHelper.getDate(basePath, new Path(path), tz);
             instance.setInstance(SchemaHelper.formatDateUTC(date));
             Calendar cal = Calendar.getInstance();
             cal.setTime(dataStart);

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 3f622c7..1dd242a 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -165,6 +165,27 @@ In addition you can set any other environment variables you might need. This fil
 #export FALCON_EXPANDED_WEBAPP_DIR=
 </verbatim>
 
+*Configuring Monitoring plugin to register catalog partitions*
+Falcon comes with a monitoring plugin that registers catalog partition. This comes in really handy during migration from filesystem based feeds to hcatalog based feeds.
+This plugin enables the user to de-couple the partition registration and assume that all partitions are already on hcatalog even before the migration, simplifying the hcatalog migration.
+
+By default this plugin is disabled.
+To enable this plugin and leverage the feature, there are 3 pre-requisites:
+
+<verbatim>
+In {package dir}/conf/startup.properties, add
+*.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
+
+In the cluster definition, ensure registry endpoint is defined.
+Ex:
+<interface type="registry" endpoint="thrift://localhost:1109" version="0.13.3"/>
+
+In the feed definition, ensure the corresponding catalog table is mentioned in feed-properties
+Ex:
+<properties>
+    <property name="catalog.table" value="catalog:default:in_table#year={YEAR};month={MONTH};day={DAY};hour={HOUR};minute={MINUTE}"/>
+</properties>
+</verbatim>
 
 *NOTE for Mac OS users*
 <verbatim>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
index 2167375..cdd06db 100644
--- a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
+++ b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
@@ -22,8 +22,14 @@ import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
 
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Classic protocol provider for Hadoop v2 based tests.
@@ -32,6 +38,10 @@ public class ClassicClientProtocolProvider extends ClientProtocolProvider {
 
     private static final String LOCALHOST = "localhost";
 
+    private static final ConcurrentHashMap<String, LocalJobRunner> CACHE = new ConcurrentHashMap<String, LocalJobRunner>();
+
+    private boolean initialised = false;
+
     @Override
     public ClientProtocol create(Configuration conf) throws IOException {
         String framework = conf.get(MRConfig.FRAMEWORK_NAME, "unittests");
@@ -40,7 +50,16 @@ public class ClassicClientProtocolProvider extends ClientProtocolProvider {
         if (!"unittests".equals(framework) || !tracker.startsWith(LOCALHOST)) {
             return null;
         }
-        return new LocalJobRunner(conf);
+
+        if (!CACHE.containsKey(tracker)) {
+            CACHE.putIfAbsent(tracker, new LocalJobRunner(conf));
+        }
+
+        if (!initialised) {
+            System.setOut(new PrintStream(new BufferedOutputStream(new FileOutputStream("target/logs/system-out.log")), true));
+            initialised = true;
+        }
+        return CACHE.get(tracker);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index bac421f..b4eae5d 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClientException;
-import org.mortbay.log.Log;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,7 +97,7 @@ public final class LogProvider {
             if (fs.exists(jobPath)) {
                 return getFormatedRunId(runId);
             } else {
-                Log.warn("No run dirs are available in logs dir:" + jobPath);
+                LOG.warn("No run dirs are available in logs dir:" + jobPath);
                 return "-";
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index 7a87919..0366350 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -260,7 +260,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
 
     private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
         if (entity.getOutputs() == null) {
-            props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "NONE");
+            props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), IGNORE);
             props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
             return;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 462e26b..f4ffbc1 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -87,7 +87,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 WorkflowJob.Status.FAILED);
     private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING);
     private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED);
-    private static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
+    public static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
         Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
     private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND =
         Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 545beb1..4e5c3f0 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -731,7 +731,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         verifyBrokerProperties(cluster, props);
 
         Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "NONE");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "IGNORE");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 4a3054a..af9b132 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -195,11 +195,11 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>aspectj-maven-plugin</artifactId>
-                <version>1.4</version>
+                <version>1.7</version>
                 <configuration>
                     <verbose>true</verbose>
-                    <source>1.6</source>
-                    <complianceLevel>1.6</complianceLevel>
+                    <source>1.7</source>
+                    <complianceLevel>1.7</complianceLevel>
                     <includes>
                         <include>org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java</include>
                         <include>org/apache/falcon/resource/proxy/InstanceManagerProxy.java</include>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
----------------------------------------------------------------------
diff --git a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
index 970d381..a2feccf 100644
--- a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
+++ b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
@@ -255,7 +255,7 @@ public class FeedEvictorTest {
             Assert.assertEquals("instances=NULL", stream.getBuffer());
 
             stream.clear();
-            String dataPath = "/data/YYYY/feed4/dd/MM/02/more/hello";
+            String dataPath = "/data/YYYY/feed4/dd/MM/more/hello";
             String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv";
             FeedEvictor.main(new String[] {
                 "-feedBasePath", LocationType.DATA.name() + "="
@@ -273,6 +273,7 @@ public class FeedEvictorTest {
 
             assertFailures(fs, pair);
         } catch (Exception e) {
+            e.printStackTrace();
             Assert.fail("Unknown exception", e);
         }
     }
@@ -308,7 +309,7 @@ public class FeedEvictorTest {
             stream.clear();
             String dataPath = LocationType.DATA.name() + "="
                     + cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
-                    + "/data/YYYY/feed4/dd/MM/02/more/hello";
+                    + "/data/YYYY/feed4/dd/MM/more/hello";
             String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv";
             FeedEvictor.main(new String[]{
                 "-feedBasePath", dataPath,

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
----------------------------------------------------------------------
diff --git a/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml b/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
index cf297de..a6914cd 100644
--- a/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
+++ b/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
@@ -65,4 +65,8 @@
       </description>
     </property>
 
+    <property>
+        <name>mapreduce.framework.name</name>
+        <value>unittests</value>
+    </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
----------------------------------------------------------------------
diff --git a/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml b/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
index 658752b..52fdf6d 100644
--- a/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
+++ b/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
@@ -28,11 +28,6 @@
   </property>
 
   <property>
-    <name>mapreduce.framework.name</name>
-    <value>unittests</value>
-  </property>
-
-  <property>
     <name>yarn.resourcemanager.resource-tracker.address</name>
     <value>0.0.0.0:41025</value>
   </property>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java b/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java
new file mode 100644
index 0000000..c7b7d3b
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.hive.hcatalog.api.HCatPartition;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * IT for catalog partition handler which is JMS message listener.
+ */
+@Test(groups = {"exhaustive"})
+public class CatalogPartitionHandlerIT {
+    private static final String METASTORE_URL = "thrift://localhost:49083";
+    private static final String DB = "falcon_db";
+    private static final String TABLE = "output_table";
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        TestContext.prepare();
+    }
+
+    // TODO : Enable this after oozie/hadoop config file changes
+    @Test(enabled = false)
+    public void testPartitionRegistration() throws Exception {
+        TestContext context = newContext();
+
+        HiveTestUtils.createDatabase(METASTORE_URL, DB);
+        HiveTestUtils.createTable(METASTORE_URL, DB, TABLE, Arrays.asList("ds"));
+        context.scheduleProcess();
+        List<WorkflowJob> instances = OozieTestUtils.waitForProcessWFtoStart(context);
+        OozieTestUtils.waitForInstanceToComplete(context, instances.get(0).getId());
+
+        HCatPartition partition = HiveTestUtils.getPartition(METASTORE_URL, DB, TABLE, "ds", "2012-04-19");
+        Assert.assertNotNull(partition);
+    }
+
+    private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>();
+
+    private TestContext newContext() {
+        contexts.set(new TestContext());
+        return contexts.get();
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        TestContext testContext = contexts.get();
+        if (testContext != null) {
+            OozieTestUtils.killOozieJobs(testContext);
+        }
+
+        contexts.remove();
+    }
+}


Mime
View raw message