falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [4/8] incubator-falcon git commit: FALCON-880 Oozie Java actions for hive tables fail in secure mode. Contributed by Venkatesh Seetharam
Date Fri, 14 Nov 2014 02:54:49 GMT
FALCON-880 Oozie Java actions for hive tables fail in secure mode. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/ff7b5a73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/ff7b5a73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/ff7b5a73

Branch: refs/heads/master
Commit: ff7b5a737712419a02cd85bbd12aeeffd6ca343e
Parents: 32aaa8e
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Thu Nov 13 18:01:09 2014 -0800
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Thu Nov 13 18:01:09 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../falcon/catalog/AbstractCatalogService.java  |  29 +++-
 .../falcon/catalog/HiveCatalogService.java      | 164 +++++++++++++++----
 .../apache/falcon/entity/CatalogStorage.java    |  20 ++-
 .../org/apache/falcon/entity/FeedHelper.java    |  13 ++
 .../apache/falcon/entity/FileSystemStorage.java |  14 +-
 .../java/org/apache/falcon/entity/Storage.java  |   3 +-
 .../util/OozieActionConfigurationHelper.java    |  80 +++++++++
 .../apache/falcon/latedata/LateDataHandler.java |  13 +-
 .../apache/falcon/retention/FeedEvictor.java    |  12 +-
 .../falcon/catalog/HiveCatalogServiceIT.java    |  24 +--
 .../apache/falcon/late/LateDataHandlerIT.java   |  13 +-
 12 files changed, 301 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36e5ba8..9bd7d4d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -144,6 +144,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-880 Oozie Java actions for hive tables fail in secure mode
+   (Venkatesh Seetharam)
+
    FALCON-717 Shutdown not clean for JMSMessageConsumer
    (Shaik Idris Ali via Venkatesh Seetharam
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index df55b88..e64a5be 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.catalog;
 
 import org.apache.falcon.FalconException;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.List;
 import java.util.Map;
@@ -56,18 +57,20 @@ public abstract class AbstractCatalogService {
     /**
      * Returns if the table is external or not. Executed in the workflow engine.
      *
+     * @param conf conf object
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName tableName to check if it exists
      * @return true if external else false
      * @throws FalconException
      */
-    public abstract boolean isTableExternal(String catalogUrl, String database,
+    public abstract boolean isTableExternal(Configuration conf, String catalogUrl, String database,
                                             String tableName) throws FalconException;
 
     /**
      * List partitions by filter. Executed in the workflow engine.
      *
+     * @param conf conf object
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName tableName to check if it exists
@@ -77,13 +80,17 @@ public abstract class AbstractCatalogService {
      * @return list of partitions
      * @throws FalconException
      */
-    public abstract List<CatalogPartition> listPartitionsByFilter(String catalogUrl, String database,
+    public abstract List<CatalogPartition> listPartitionsByFilter(Configuration conf,
+                                                                  String catalogUrl,
+                                                                  String database,
                                                                   String tableName, String filter)
         throws FalconException;
 
     /**
      * Drops a given partition. Executed in the workflow engine.
      *
+     *
+     * @param conf  conf object
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName tableName to check if it exists
@@ -91,12 +98,15 @@ public abstract class AbstractCatalogService {
      * @return if the partition was dropped
      * @throws FalconException
      */
-    public abstract boolean dropPartitions(String catalogUrl, String database, String tableName,
+    public abstract boolean dropPartitions(Configuration conf, String catalogUrl,
+                                           String database, String tableName,
                                            Map<String, String> partitions) throws FalconException;
 
     /**
      * Gets the partition. Executed in the workflow engine.
      *
+     *
+     * @param conf  conf
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName tableName to check if it exists
@@ -105,16 +115,21 @@ public abstract class AbstractCatalogService {
      * @return An instance of CatalogPartition.
      * @throws FalconException
      */
-    public abstract CatalogPartition getPartition(String catalogUrl, String database, String tableName,
-                                                  Map<String, String> partitionSpec) throws FalconException;
+    public abstract CatalogPartition getPartition(Configuration conf, String catalogUrl,
+                                                  String database, String tableName,
+                                                  Map<String, String> partitionSpec)
+        throws FalconException;
 
     /**
+     *
+     * @param conf  conf
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName table name
      * @return list of partition column names of the table
      * @throws FalconException
      */
-    public abstract List<String> getTablePartitionCols(String catalogUrl, String database,
-                                                     String tableName) throws FalconException;
+    public abstract List<String> getTablePartitionCols(Configuration conf, String catalogUrl,
+                                                       String database,
+                                                       String tableName) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 170fef2..51fb6b7 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -20,8 +20,12 @@ package org.apache.falcon.catalog;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.io.Text;
@@ -35,6 +39,7 @@ import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -49,17 +54,32 @@ public class HiveCatalogService extends AbstractCatalogService {
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
 
+    /**
+     * This is only used for tests.
+     *
+     * @param metastoreUrl metastore url
+     * @return client object
+     * @throws FalconException
+     */
     public static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
         try {
             HiveConf hcatConf = createHiveConf(metastoreUrl);
             return HCatClient.create(hcatConf);
         } catch (HCatException e) {
             throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
+        } catch (IOException e) {
+            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
         }
     }
 
-    private static HiveConf createHiveConf(String metastoreUrl) {
-        HiveConf hcatConf = new HiveConf();
+    private static HiveConf createHiveConf(String metastoreUrl) throws IOException {
+        return createHiveConf(new Configuration(false), metastoreUrl);
+    }
+
+    private static HiveConf createHiveConf(Configuration conf,
+                                           String metastoreUrl) throws IOException {
+        HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
+
         hcatConf.set("hive.metastore.local", "false");
         hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
         hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
@@ -72,24 +92,85 @@ public class HiveCatalogService extends AbstractCatalogService {
         return hcatConf;
     }
 
-    public static synchronized HCatClient getProxiedClient(String catalogUrl,
-                                                           String metaStoreServicePrincipal)
-        throws FalconException {
-
+    /**
+     * This is used from with in an oozie job.
+     *
+     * @param conf conf object
+     * @param metastoreUrl metastore uri
+     * @return hive metastore client handle
+     * @throws FalconException
+     */
+    private static HCatClient createHCatClient(Configuration conf,
+                                               String metastoreUrl) throws FalconException {
         try {
-            final HiveConf hcatConf = createHiveConf(catalogUrl);
-            UserGroupInformation proxyUGI = CurrentUser.getProxyUGI();
+            LOG.info("Creating HCatalog client object for metastore {} using conf {}",
+                metastoreUrl, conf.toString());
+            final Credentials credentials = getCredentials(conf);
+            Configuration jobConf = credentials != null ? copyCredentialsToConf(conf, credentials) : conf;
+            HiveConf hcatConf = createHiveConf(jobConf, metastoreUrl);
+
             if (UserGroupInformation.isSecurityEnabled()) {
                 hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
-                        metaStoreServicePrincipal);
+                    conf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
                 hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
 
-                Token<DelegationTokenIdentifier> delegationTokenId = getDelegationToken(
-                        hcatConf, metaStoreServicePrincipal);
-                proxyUGI.addToken(delegationTokenId);
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+                ugi.addCredentials(credentials); // credentials cannot be null
             }
 
-            LOG.info("Creating and caching HCatalog client object for {}", catalogUrl);
+            OozieActionConfigurationHelper.dumpConf(hcatConf, "hive conf ");
+
+            return HCatClient.create(hcatConf);
+        } catch (HCatException e) {
+            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
+        } catch (IOException e) {
+            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
+        }
+    }
+
+    private static JobConf copyCredentialsToConf(Configuration conf, Credentials credentials) {
+        JobConf jobConf = new JobConf(conf);
+        jobConf.setCredentials(credentials);
+        return jobConf;
+    }
+
+    private static Credentials getCredentials(Configuration conf) throws IOException {
+        final String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+        if (tokenFile == null) {
+            return null;
+        }
+
+        try {
+            LOG.info("Adding credentials/delegation tokens from token file={} to conf", tokenFile);
+            Credentials credentials = Credentials.readTokenStorageFile(new File(tokenFile), conf);
+            LOG.info("credentials numberOfTokens={}, numberOfSecretKeys={}",
+                credentials.numberOfTokens(), credentials.numberOfSecretKeys());
+            return credentials;
+        } catch (IOException e) {
+            LOG.warn("error while fetching credentials from {}", tokenFile);
+        }
+
+        return null;
+    }
+
+    /**
+     * This is used from with in falcon namespace.
+     *
+     * @param catalogUrl metastore uri
+     * @param metaStoreServicePrincipal metastore principal
+     * @return hive metastore client handle
+     * @throws FalconException
+     */
+    private static synchronized HCatClient createProxiedHCatClient(String catalogUrl,
+                                                                   String metaStoreServicePrincipal)
+        throws FalconException {
+
+        try {
+            final HiveConf hcatConf = createHiveConf(catalogUrl);
+            UserGroupInformation proxyUGI = CurrentUser.getProxyUGI();
+            addSecureCredentialsAndToken(metaStoreServicePrincipal, hcatConf, proxyUGI);
+
+            LOG.info("Creating HCatalog client object for {}", catalogUrl);
             return proxyUGI.doAs(new PrivilegedExceptionAction<HCatClient>() {
                 public HCatClient run() throws Exception {
                     return HCatClient.create(hcatConf);
@@ -98,8 +179,21 @@ public class HiveCatalogService extends AbstractCatalogService {
         } catch (IOException e) {
             throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
         } catch (InterruptedException e) {
-            throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(),
-                    e);
+            throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
+        }
+    }
+
+    private static void addSecureCredentialsAndToken(String metaStoreServicePrincipal,
+                                                     HiveConf hcatConf,
+                                                     UserGroupInformation proxyUGI) throws IOException {
+        if (UserGroupInformation.isSecurityEnabled()) {
+            hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+                metaStoreServicePrincipal);
+            hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+
+            Token<DelegationTokenIdentifier> delegationTokenId = getDelegationToken(
+                hcatConf, metaStoreServicePrincipal);
+            proxyUGI.addToken(delegationTokenId);
         }
     }
 
@@ -107,6 +201,7 @@ public class HiveCatalogService extends AbstractCatalogService {
                                                                        String metaStoreServicePrincipal)
         throws IOException {
 
+        LOG.info("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
         HCatClient hcatClient = HCatClient.create(hcatConf);
         String delegationToken = hcatClient.getDelegationToken(
                 CurrentUser.getUser(), metaStoreServicePrincipal);
@@ -115,6 +210,7 @@ public class HiveCatalogService extends AbstractCatalogService {
         Token<DelegationTokenIdentifier> delegationTokenId = new Token<DelegationTokenIdentifier>();
         delegationTokenId.decodeFromUrlString(delegationToken);
         delegationTokenId.setService(new Text("FalconService"));
+        LOG.info("Created delegation token={}", delegationToken);
         return delegationTokenId;
     }
 
@@ -124,7 +220,7 @@ public class HiveCatalogService extends AbstractCatalogService {
         LOG.info("Checking if the service is alive for: {}", catalogUrl);
 
         try {
-            HCatClient client = getProxiedClient(catalogUrl, metaStorePrincipal);
+            HCatClient client = createProxiedHCatClient(catalogUrl, metaStorePrincipal);
             HCatDatabase database = client.getDatabase("default");
             return database != null;
         } catch (HCatException e) {
@@ -138,7 +234,7 @@ public class HiveCatalogService extends AbstractCatalogService {
         LOG.info("Checking if the table exists: {}", tableName);
 
         try {
-            HCatClient client = getProxiedClient(catalogUrl, metaStorePrincipal);
+            HCatClient client = createProxiedHCatClient(catalogUrl, metaStorePrincipal);
             HCatTable table = client.getTable(database, tableName);
             return table != null;
         } catch (HCatException e) {
@@ -147,12 +243,12 @@ public class HiveCatalogService extends AbstractCatalogService {
     }
 
     @Override
-    public boolean isTableExternal(String catalogUrl, String database, String tableName)
-        throws FalconException {
+    public boolean isTableExternal(Configuration conf, String catalogUrl, String database,
+                                   String tableName) throws FalconException {
         LOG.info("Checking if the table is external: {}", tableName);
 
         try {
-            HCatClient client = getHCatClient(catalogUrl);
+            HCatClient client = createHCatClient(conf, catalogUrl);
             HCatTable table = client.getTable(database, tableName);
             return !table.getTabletype().equals("MANAGED_TABLE");
         } catch (HCatException e) {
@@ -161,15 +257,15 @@ public class HiveCatalogService extends AbstractCatalogService {
     }
 
     @Override
-    public List<CatalogPartition> listPartitionsByFilter(String catalogUrl, String database,
-                                                         String tableName, String filter)
-        throws FalconException {
+    public List<CatalogPartition> listPartitionsByFilter(Configuration conf, String catalogUrl,
+                                                         String database, String tableName,
+                                                         String filter) throws FalconException {
         LOG.info("List partitions for: {}, partition filter: {}", tableName, filter);
 
         try {
             List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
 
-            HCatClient client = getHCatClient(catalogUrl);
+            HCatClient client = createHCatClient(conf, catalogUrl);
             List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter);
             for (HCatPartition hCatPartition : hCatPartitions) {
                 LOG.info("Partition: " + hCatPartition.getValues());
@@ -205,13 +301,13 @@ public class HiveCatalogService extends AbstractCatalogService {
     }
 
     @Override
-    public boolean dropPartitions(String catalogUrl, String database,
-                                  String tableName, Map<String, String> partitions)
-        throws FalconException {
+    public boolean dropPartitions(Configuration conf, String catalogUrl,
+                                  String database, String tableName,
+                                  Map<String, String> partitions) throws FalconException {
         LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitions);
 
         try {
-            HCatClient client = getHCatClient(catalogUrl);
+            HCatClient client = createHCatClient(conf, catalogUrl);
             client.dropPartitions(database, tableName, partitions, true);
         } catch (HCatException e) {
             throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
@@ -221,12 +317,13 @@ public class HiveCatalogService extends AbstractCatalogService {
     }
 
     @Override
-    public CatalogPartition getPartition(String catalogUrl, String database, String tableName,
+    public CatalogPartition getPartition(Configuration conf, String catalogUrl,
+                                         String database, String tableName,
                                          Map<String, String> partitionSpec) throws FalconException {
         LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionSpec);
 
         try {
-            HCatClient client = getHCatClient(catalogUrl);
+            HCatClient client = createHCatClient(conf, catalogUrl);
             HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
             return createCatalogPartition(hCatPartition);
         } catch (HCatException e) {
@@ -235,12 +332,13 @@ public class HiveCatalogService extends AbstractCatalogService {
     }
 
     @Override
-    public List<String> getTablePartitionCols(String catalogUrl, String database,
-                                            String tableName) throws FalconException {
+    public List<String> getTablePartitionCols(Configuration conf, String catalogUrl,
+                                              String database,
+                                              String tableName) throws FalconException {
         LOG.info("Fetching partition columns of table: " + tableName);
 
         try {
-            HCatClient client = getHCatClient(catalogUrl);
+            HCatClient client = createHCatClient(conf, catalogUrl);
             HCatTable table = client.getTable(database, tableName);
             List<HCatFieldSchema> partSchema = table.getPartCols();
             List<String> partCols = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 495e2a9..e68044a 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -33,6 +33,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.retention.EvictionHelper;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +57,7 @@ import java.util.regex.Matcher;
 /**
  * A catalog registry implementation of a feed storage.
  */
-public class CatalogStorage implements Storage {
+public class CatalogStorage extends Configured implements Storage {
 
     private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
 
@@ -191,6 +192,11 @@ public class CatalogStorage implements Storage {
         parseUriTemplate(uri);
     }
 
+    protected CatalogStorage(String uriTemplate, Configuration conf) throws URISyntaxException {
+        this(uriTemplate);
+        setConf(conf);
+    }
+
     private void parseUriTemplate(URI uriTemplate) throws URISyntaxException {
         String path = uriTemplate.getPath();
         String[] paths = path.split(OUTPUT_PATH_SEPARATOR);
@@ -404,7 +410,7 @@ public class CatalogStorage implements Storage {
             LOG.info("No partitions to delete.");
         } else {
             final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
-                    getCatalogUrl(), getDatabase(), getTable());
+                getConf(), getCatalogUrl(), getDatabase(), getTable());
             try {
                 dropPartitions(toBeDeleted, datedPartKeys, isTableExternal);
             } catch (IOException e) {
@@ -427,7 +433,7 @@ public class CatalogStorage implements Storage {
 
         final String filter = createFilter(datedPartKeys, datedPartValues);
         return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
-                getCatalogUrl(), getDatabase(), getTable(), filter);
+            getConf(), getCatalogUrl(), getDatabase(), getTable(), filter);
     }
 
     private void fillSortedDatedPartitionKVs(List<String> sortedPartKeys, List<String> sortedPartValues,
@@ -511,8 +517,8 @@ public class CatalogStorage implements Storage {
                                 boolean isTableExternal) throws FalconException, IOException {
 
         // get table partition columns
-        List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols(getCatalogUrl(),
-                getDatabase(), getTable());
+        List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
+            getConf(), getCatalogUrl(), getDatabase(), getTable());
 
         /* In case partition columns are a super-set of dated partitions, there can be multiple
          * partitions that share the same set of date-partition values. All such partitions can
@@ -554,8 +560,8 @@ public class CatalogStorage implements Storage {
     private void dropPartitionInstances(List<CatalogPartition> partitionsToDrop, Map<String, String> partSpec,
                                         boolean isTableExternal) throws FalconException, IOException {
 
-        boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions(getCatalogUrl(), getDatabase(),
-                getTable(), partSpec);
+        boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions(
+            getConf(), getCatalogUrl(), getDatabase(), getTable(), partSpec);
 
         if (!deleted) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index dec9907..b5dd5c3 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -37,6 +37,7 @@ import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.util.BuildProperties;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
@@ -175,6 +176,18 @@ public final class FeedHelper {
         throw new IllegalArgumentException("Bad type: " + type);
     }
 
+    public static Storage createStorage(String type, String storageUriTemplate,
+                                        Configuration conf) throws URISyntaxException {
+        Storage.TYPE storageType = Storage.TYPE.valueOf(type);
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            return new FileSystemStorage(storageUriTemplate);
+        } else if (storageType == Storage.TYPE.TABLE) {
+            return new CatalogStorage(storageUriTemplate, conf);
+        }
+
+        throw new IllegalArgumentException("Bad type: " + type);
+    }
+
     public static Storage.TYPE getStorageType(Feed feed) throws FalconException {
         final Locations feedLocations = feed.getLocations();
         if (feedLocations != null

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 1e63ab6..f7bd321 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -35,6 +35,7 @@ import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.retention.EvictionHelper;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,7 +63,7 @@ import java.util.regex.Pattern;
 /**
  * A file system implementation of a feed storage.
  */
-public class FileSystemStorage implements Storage {
+public class FileSystemStorage extends Configured implements Storage {
 
     private static final Logger LOG = LoggerFactory.getLogger(FileSystemStorage.class);
     private final StringBuffer instancePaths = new StringBuffer();
@@ -301,7 +302,8 @@ public class FileSystemStorage implements Storage {
     }
 
     @Override
-    public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException {
+    public StringBuilder evict(String retentionLimit, String timeZone,
+                               Path logFilePath) throws FalconException {
         try{
             for (Location location : getLocations()) {
                 fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, timeZone, logFilePath);
@@ -318,8 +320,8 @@ public class FileSystemStorage implements Storage {
         return instanceDates;
     }
 
-    private void fileSystemEvictor(String feedPath, String retentionLimit,
-                                   String timeZone, Path logFilePath) throws IOException, ELException, FalconException {
+    private void fileSystemEvictor(String feedPath, String retentionLimit, String timeZone,
+                                   Path logFilePath) throws IOException, ELException, FalconException {
         Path normalizedPath = new Path(feedPath);
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri());
         feedPath = normalizedPath.toUri().getPath();
@@ -346,7 +348,7 @@ public class FileSystemStorage implements Storage {
     }
 
     private List<Path> discoverInstanceToDelete(String inPath, String timeZone, String dateMask,
-                                                      Date start, FileSystem fs) throws IOException {
+                                                Date start, FileSystem fs) throws IOException {
 
         FileStatus[] files = findFilesForFeed(fs, inPath);
         if (files == null || files.length == 0) {
@@ -480,7 +482,7 @@ public class FileSystemStorage implements Storage {
         return fileStatus;
     }
 
-    private Configuration getConf() {
+    public Configuration getConf() {
         Configuration conf = new Configuration();
         conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
         return conf;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
index 0e39ea2..777cde8 100644
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.AccessControlList;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Date;
@@ -31,7 +32,7 @@ import java.util.List;
  * A class to encapsulate the storage for a given feed which can either be
  * expressed as a path on the file system or a table in a catalog.
  */
-public interface Storage {
+public interface Storage extends Configurable {
 
     String DOLLAR_EXPR_START_REGEX = "\\$\\{";
     String QUESTION_EXPR_START_REGEX = "\\?\\{";

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
new file mode 100644
index 0000000..3f07c3c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Utility to read oozie action conf at oozie.action.conf.xml.
+ */
+public final class OozieActionConfigurationHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OozieActionConfigurationHelper.class);
+
+    private OozieActionConfigurationHelper() {
+    }
+
+    public static Configuration createActionConf() throws IOException {
+        Configuration conf = new Configuration();
+        Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
+
+        final boolean actionConfExists = confPath.getFileSystem(conf).exists(confPath);
+        LOG.info("Oozie Action conf {} found ? {}", confPath, actionConfExists);
+        if (actionConfExists) {
+            LOG.info("Oozie Action conf found, adding path={}, conf={}", confPath, conf.toString());
+            conf.addResource(confPath);
+            dumpConf(conf, "oozie action conf ");
+        }
+
+        String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+        if (tokenFile != null) {
+            if (Shell.WINDOWS) {
+                if (tokenFile.charAt(0) == '"') {
+                    tokenFile = tokenFile.substring(1);
+                }
+                if (tokenFile.charAt(tokenFile.length() - 1) == '"') {
+                    tokenFile = tokenFile.substring(0, tokenFile.length() - 1);
+                }
+            }
+
+            conf.set("mapreduce.job.credentials.binary", tokenFile);
+            System.setProperty("mapreduce.job.credentials.binary", tokenFile);
+            conf.set("tez.credentials.path", tokenFile);
+            System.setProperty("tez.credentials.path", tokenFile);
+        }
+
+        conf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
+        conf.setBoolean("hive.exec.mode.local.auto", false);
+
+        return conf;
+    }
+
+    public static void dumpConf(Configuration conf, String message) throws IOException {
+        StringWriter writer = new StringWriter();
+        Configuration.dumpConfiguration(conf, writer);
+        LOG.info(message + " {}", writer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 4e8e1cd..d5b7db0 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,12 +51,7 @@ public class LateDataHandler extends Configured implements Tool {
     private static final Logger LOG = LoggerFactory.getLogger(LateDataHandler.class);
 
     public static void main(String[] args) throws Exception {
-        Configuration conf = new Configuration();
-        Path confPath = new Path("file:///"
-                + System.getProperty("oozie.action.conf.xml"));
-
-        LOG.info("{} found ? {}", confPath, confPath.getFileSystem(conf).exists(confPath));
-        conf.addResource(confPath);
+        Configuration conf = OozieActionConfigurationHelper.createActionConf();
         ToolRunner.run(conf, new LateDataHandler(), args);
     }
 
@@ -236,9 +232,10 @@ public class LateDataHandler extends Configured implements Tool {
         throws IOException, URISyntaxException, FalconException {
 
         CatalogStorage storage = (CatalogStorage)
-                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate);
+                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate, getConf());
         CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
-                storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), storage.getPartitions());
+                getConf(), storage.getCatalogUrl(), storage.getDatabase(),
+                storage.getTable(), storage.getPartitions());
         return partition == null ? 0 : partition.getCreateTime();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 03ebb07..f28e696 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -47,11 +48,7 @@ public class FeedEvictor extends Configured implements Tool {
     public static final AtomicReference<PrintStream> OUT = new AtomicReference<PrintStream>(System.out);
 
     public static void main(String[] args) throws Exception {
-        Configuration conf = new Configuration();
-        Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
-
-        LOG.info("{} found ? {}", confPath, confPath.getFileSystem(conf).exists(confPath));
-        conf.addResource(confPath);
+        Configuration conf = OozieActionConfigurationHelper.createActionConf();
         int ret = ToolRunner.run(conf, new FeedEvictor(), args);
         if (ret != 0) {
             throw new Exception("Unable to perform eviction action args: " + Arrays.toString(args));
@@ -74,7 +71,7 @@ public class FeedEvictor extends Configured implements Tool {
         LOG.info("Applying retention on {} type: {}, Limit: {}, timezone: {}, frequency: {}, storage: {}",
                 feedPattern, retentionType, retentionLimit, timeZone, frequency, feedStorageType);
 
-        Storage storage = FeedHelper.createStorage(feedStorageType, feedPattern);
+        Storage storage = FeedHelper.createStorage(feedStorageType, feedPattern, getConf());
         Path path = new Path(logFile);
         StringBuilder buffer = storage.evict(retentionLimit, timeZone, path);
 
@@ -125,7 +122,4 @@ public class FeedEvictor extends Configured implements Tool {
 
         return new GnuParser().parse(options, args);
     }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index 04fbf4d..b422119 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -21,6 +21,7 @@ package org.apache.falcon.catalog;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hcatalog.api.HCatAddPartitionDesc;
 import org.apache.hcatalog.api.HCatClient;
 import org.apache.hcatalog.api.HCatCreateDBDesc;
@@ -53,6 +54,7 @@ public class HiveCatalogServiceIT {
     private static final String EXTERNAL_TABLE_NAME = "falcon_external";
     private static final String EXTERNAL_TABLE_LOCATION = "jail://global:00/falcon/staging/falcon_external";
 
+    private final Configuration conf = new Configuration(false);
     private HiveCatalogService hiveCatalogService;
     private HCatClient client;
 
@@ -195,18 +197,18 @@ public class HiveCatalogServiceIT {
 
     @Test
     public void testIsTableExternalFalse() throws Exception {
-        Assert.assertFalse(hiveCatalogService.isTableExternal(METASTORE_URL, DATABASE_NAME, TABLE_NAME));
+        Assert.assertFalse(hiveCatalogService.isTableExternal(conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME));
     }
     @Test
     public void testIsTableExternalTrue() throws Exception {
-        Assert.assertTrue(hiveCatalogService.isTableExternal(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME));
+        Assert.assertTrue(hiveCatalogService.isTableExternal(conf, METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME));
     }
 
     @Test
     public void testListPartitionsByFilterNull() throws Exception {
 
         List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, null);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, null);
         Assert.assertEquals(filteredPartitions.size(), 3);
     }
 
@@ -225,7 +227,7 @@ public class HiveCatalogServiceIT {
         throws Exception {
 
         List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, lessThanFilter);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, lessThanFilter);
         Assert.assertEquals(filteredPartitions.size(), expectedPartitionCount);
     }
 
@@ -245,7 +247,7 @@ public class HiveCatalogServiceIT {
         throws Exception {
 
         List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter);
         Assert.assertEquals(filteredPartitions.size(), expectedPartitionCount);
     }
 
@@ -274,7 +276,7 @@ public class HiveCatalogServiceIT {
         partialPartitionSpec.put("ds", "20130903");
 
         Assert.assertTrue(hiveCatalogService.dropPartitions(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
 
         List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
         Assert.assertEquals(1, partitions.size(), "Unexpected number of partitions");
@@ -285,7 +287,7 @@ public class HiveCatalogServiceIT {
         partialPartitionSpec.put("ds", "20130902");
 
         Assert.assertTrue(hiveCatalogService.dropPartitions(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
         partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
         Assert.assertEquals(partitions.size(), 0, "Unexpected number of partitions");
     }
@@ -297,7 +299,7 @@ public class HiveCatalogServiceIT {
         partitionSpec.put("region", "in");
 
         CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
         Assert.assertNotNull(partition);
 
         long createTime = partition.getCreateTime();
@@ -315,7 +317,7 @@ public class HiveCatalogServiceIT {
         client.addPartition(first);
 
         CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
         Assert.assertNotNull(partition);
         final long originalCreateTime = partition.getCreateTime();
 
@@ -329,7 +331,7 @@ public class HiveCatalogServiceIT {
         client.addPartition(second);
 
         CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
         Assert.assertNotNull(reInstatedPartition);
         final long reInstatedCreateTime = reInstatedPartition.getCreateTime();
 
@@ -347,7 +349,7 @@ public class HiveCatalogServiceIT {
     @Test  (dataProvider = "tableName")
     public void testGetTablePartitionCols(String tableName) throws Exception {
         List<String> partCols = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
-                METASTORE_URL, DATABASE_NAME, tableName);
+            conf, METASTORE_URL, DATABASE_NAME, tableName);
         Assert.assertEquals(partCols.size(), 2);
         Collections.sort(partCols);
         Assert.assertEquals(partCols.get(0), "ds");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ff7b5a73/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index bfc6f2f..cf537c9 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -59,6 +59,7 @@ public class LateDataHandlerIT {
     private static final String PARTITION_VALUE = "2012-04-21-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
     private static final String LATE_DATA_DIR = "/falcon/test/late/foo/logs/latedata/2013-09-24-00-00/primary-cluster";
 
+    private final Configuration conf = new Configuration(false);
     private final TestContext context = new TestContext();
     private String storageUrl;
     private String metastoreUrl;
@@ -144,10 +145,11 @@ public class LateDataHandlerIT {
         }
 
         LateDataHandler lateDataHandler = new LateDataHandler();
-        final long metric = lateDataHandler.computeStorageMetric(args[3], args[7], new Configuration());
+        lateDataHandler.setConf(conf);
+        final long metric = lateDataHandler.computeStorageMetric(args[3], args[7], conf);
         Assert.assertEquals(recordedMetrics.get("foo").longValue(), metric);
 
-        final String changes = lateDataHandler.detectChanges(lateDataPath, recordedMetrics, new Configuration());
+        final String changes = lateDataHandler.detectChanges(lateDataPath, recordedMetrics, conf);
         Assert.assertEquals("", changes);
     }
 
@@ -185,13 +187,14 @@ public class LateDataHandlerIT {
         reinstatePartition();
 
         LateDataHandler lateDataHandler = new LateDataHandler();
-        long metric = lateDataHandler.computeStorageMetric(args[3], args[7], new Configuration());
+        lateDataHandler.setConf(conf);
+        long metric = lateDataHandler.computeStorageMetric(args[3], args[7], conf);
         Assert.assertFalse(recordedMetrics.get("foo") == metric);
 
         Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
         computedMetrics.put("foo", metric);
 
-        String changes = lateDataHandler.detectChanges(lateDataPath, computedMetrics, new Configuration());
+        String changes = lateDataHandler.detectChanges(lateDataPath, computedMetrics, conf);
         Assert.assertEquals("foo", changes);
     }
 
@@ -210,7 +213,7 @@ public class LateDataHandlerIT {
         client.addPartition(reinstatedPartition);
 
         CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
-                metastoreUrl, DATABASE_NAME, TABLE_NAME, partitionSpec);
+            conf, metastoreUrl, DATABASE_NAME, TABLE_NAME, partitionSpec);
         Assert.assertNotNull(reInstatedPartition);
     }
 }


Mime
View raw message