falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [43/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom
Date Tue, 01 Mar 2016 08:26:29 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
deleted file mode 100644
index 71194c7..0000000
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import java.util.List;
-
-/**
- * The CatalogPartition is a wrapper around org.apache.hive.hcatalog.api.HCatPartition.
- */
-public class CatalogPartition {
-
-    private String databaseName;
-    private String tableName;
-    private List<String> values;
-    private long createTime;
-    private long lastAccessTime;
-    private String inputFormat;
-    private String outputFormat;
-    private String location;
-    private String serdeInfo;
-    private long size = -1;
-
-    protected CatalogPartition() {
-    }
-
-    protected void setDatabaseName(String databaseName) {
-        this.databaseName = databaseName;
-    }
-
-    protected void setTableName(String tableName) {
-        this.tableName = tableName;
-    }
-
-    protected void setValues(List<String> values) {
-        this.values = values;
-    }
-
-    protected void setCreateTime(long createTime) {
-        this.createTime = createTime;
-    }
-
-    protected void setLastAccessTime(long lastAccessTime) {
-        this.lastAccessTime = lastAccessTime;
-    }
-
-    protected void setInputFormat(String inputFormat) {
-        this.inputFormat = inputFormat;
-    }
-
-    protected void setOutputFormat(String outputFormat) {
-        this.outputFormat = outputFormat;
-    }
-
-    protected void setLocation(String location) {
-        this.location = location;
-    }
-
-    protected void setSerdeInfo(String serdeInfo) {
-        this.serdeInfo = serdeInfo;
-    }
-
-    public void setSize(long size) { this.size = size; }
-
-    /**
-     * Gets the database name.
-     *
-     * @return the database name
-     */
-    public String getDatabaseName() {
-        return this.databaseName;
-    }
-
-    /**
-     * Gets the table name.
-     *
-     * @return the table name
-     */
-    public String getTableName() {
-        return this.tableName;
-    }
-
-
-    /**
-     * Gets the input format.
-     *
-     * @return the input format
-     */
-    public String getInputFormat() {
-        return this.inputFormat;
-    }
-
-    /**
-     * Gets the output format.
-     *
-     * @return the output format
-     */
-    public String getOutputFormat() {
-        return this.outputFormat;
-    }
-
-    /**
-     * Gets the location.
-     *
-     * @return the location
-     */
-    public String getLocation() {
-        return this.location;
-    }
-
-    /**
-     * Gets the serde.
-     *
-     * @return the serde
-     */
-    public String getSerDe() {
-        return this.serdeInfo;
-    }
-
-    /**
-     * Gets the last access time.
-     *
-     * @return the last access time
-     */
-    public long getLastAccessTime() {
-        return this.lastAccessTime;
-    }
-
-    /**
-     * Gets the creates the time.
-     *
-     * @return the creates the time
-     */
-    public long getCreateTime() {
-        return this.createTime;
-    }
-
-    /**
-     * Gets the values.
-     *
-     * @return the values
-     */
-    public List<String> getValues() {
-        return this.values;
-    }
-
-    /**
-     * Gets the size.
-     *
-     * @return the size
-     */
-    public long getSize() { return size; }
-
-    @Override
-    public String toString() {
-        return "CatalogPartition ["
-            + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null, ")
-            + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null, ")
-            + (values != null ? "values=" + values + ", " : "values=null, ")
-            + "size=" + size + ", " + "createTime=" + createTime + ", lastAccessTime="
-            + lastAccessTime + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
deleted file mode 100644
index cccb4f8..0000000
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.falcon.workflow.WorkflowExecutionListener;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Date;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Collection;
-import java.util.Arrays;
-import java.util.TimeZone;
-import java.util.Properties;
-
-/**
- * Listens to workflow execution completion events.
- * It syncs HCat partitions based on the feeds created/evicted/replicated.
- */
-public class CatalogPartitionHandler implements WorkflowExecutionListener{
-    private static final Logger LOG = LoggerFactory.getLogger(CatalogPartitionHandler.class);
-
-    public static final ConfigurationStore STORE = ConfigurationStore.get();
-    public static final String CATALOG_TABLE = "catalog.table";
-    private ExpressionHelper evaluator = ExpressionHelper.get();
-    private static CatalogPartitionHandler catalogInstance = new CatalogPartitionHandler();
-    private static final boolean IS_CATALOG_ENABLED = CatalogServiceFactory.isEnabled();
-    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
-    private static final PathFilter PATH_FILTER = new PathFilter() {
-        @Override public boolean accept(Path path) {
-            try {
-                FileSystem fs = path.getFileSystem(new Configuration());
-                return !path.getName().startsWith("_") && !path.getName().startsWith(".") && !fs.isFile(path);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    };
-
-    public static final CatalogPartitionHandler get() {
-        return catalogInstance;
-    }
-
-    @Override
-    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
-        if (!IS_CATALOG_ENABLED) {
-            //Skip if catalog service is not enabled
-            return;
-        }
-
-        String[] feedNames = context.getOutputFeedNamesList();
-        String[] feedPaths = context.getOutputFeedInstancePathsList();
-        Cluster cluster = STORE.get(EntityType.CLUSTER, context.getClusterName());
-        Configuration clusterConf = ClusterHelper.getConfiguration(cluster);
-
-        if (StringUtils.isEmpty(ClusterHelper.getRegistryEndPoint(cluster))) {
-            //Skip if registry endpoint is not defined for the cluster
-            LOG.info("Catalog endpoint not defined for cluster {}. Skipping partition registration", cluster.getName());
-            return;
-        }
-
-        for (int index = 0; index < feedNames.length; index++) {
-            LOG.info("Partition handling for feed {} for path {}", feedNames[index], feedPaths[index]);
-            Feed feed = STORE.get(EntityType.FEED, feedNames[index]);
-
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-            if (storage.getType() == Storage.TYPE.TABLE) {
-                //Do nothing if the feed is already table based
-                LOG.info("Feed {} is already table based. Skipping partition registration", feed.getName());
-                continue;
-            }
-
-            CatalogStorage catalogStorage = getCatalogStorageFromFeedProperties(feed, cluster, clusterConf);
-            if (catalogStorage == null) {
-                //There is no catalog defined in the feed properties. So, skip partition registration
-                LOG.info("Feed {} doesn't have table defined in its properties/table doesn't exist. "
-                        + "Skipping partition registration", feed.getName());
-                continue;
-            }
-
-            //Generate static partition values - get the date from feed path and evaluate partitions in catalog spec
-            Path feedPath = new Path(new Path(feedPaths[index]).toUri().getPath());
-
-            String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
-            LOG.debug("Template {} catalogInstance path {}", templatePath, feedPath);
-            Date date = FeedHelper.getDate(templatePath, feedPath, UTC);
-            if (date == null) {
-                LOG.info("Feed {} catalogInstance path {} doesn't match the template {}. "
-                                + "Skipping partition registration",
-                        feed.getName(), feedPath, templatePath);
-                continue;
-            }
-
-            LOG.debug("Reference date from path {} is {}", feedPath, SchemaHelper.formatDateUTC(date));
-            ExpressionHelper.setReferenceDate(date);
-            List<String> partitionValues = new ArrayList<String>();
-            for (Map.Entry<String, String> entry : catalogStorage.getPartitions().entrySet()) {
-                LOG.debug("Evaluating partition {}", entry.getValue());
-                partitionValues.add(evaluator.evaluateFullExpression(entry.getValue(), String.class));
-            }
-
-            LOG.debug("Static partition - {}", partitionValues);
-            WorkflowExecutionContext.EntityOperations operation = context.getOperation();
-            switch (operation) {
-            case DELETE:
-                dropPartitions(clusterConf, catalogStorage, partitionValues);
-                break;
-
-            case GENERATE:
-            case REPLICATE:
-                registerPartitions(clusterConf, catalogStorage, feedPath, partitionValues);
-                break;
-
-            default:
-                throw new FalconException("Unhandled operation " + operation);
-            }
-        }
-    }
-
-    //Register additional partitions. Compare the expected partitions and the existing partitions
-    //1.exist (intersection) expected --> partition already exists, so update partition
-    //2.exist - expected --> partition is not required anymore, so drop partition
-    //3.expected - exist --> partition doesn't exist, so add partition
-    private void registerPartitions(Configuration conf, CatalogStorage storage, Path staticPath,
-                                    List<String> staticPartition) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
-            if (!fs.exists(staticPath)) {
-                //Do nothing if the output path doesn't exist
-                return;
-            }
-
-            List<String> partitionColumns = getPartitionColumns(conf, storage);
-            int dynamicPartCols = partitionColumns.size() - staticPartition.size();
-            Path searchPath = staticPath;
-            if (dynamicPartCols > 0) {
-                searchPath = new Path(staticPath, StringUtils.repeat("*", "/", dynamicPartCols));
-            }
-
-            //Figure out the dynamic partitions from the directories on hdfs
-            FileStatus[] files = fs.globStatus(searchPath, PATH_FILTER);
-            Map<List<String>, String> partitions = new HashMap<List<String>, String>();
-            for (FileStatus file : files) {
-                List<String> dynamicParts = getDynamicPartitions(file.getPath(), staticPath);
-                List<String> partitionValues = new ArrayList<String>(staticPartition);
-                partitionValues.addAll(dynamicParts);
-                LOG.debug("Final partition - " + partitionValues);
-                partitions.put(partitionValues, file.getPath().toString());
-            }
-
-            List<List<String>> existPartitions = listPartitions(conf, storage, staticPartition);
-            Collection<List<String>> targetPartitions = partitions.keySet();
-
-            Collection<List<String>> partitionsForDrop = CollectionUtils.subtract(existPartitions, targetPartitions);
-            Collection<List<String>> partitionsForAdd = CollectionUtils.subtract(targetPartitions, existPartitions);
-            Collection<List<String>> partitionsForUpdate =
-                    CollectionUtils.intersection(existPartitions, targetPartitions);
-
-            for (List<String> partition : partitionsForDrop) {
-                dropPartitions(conf, storage, partition);
-            }
-
-            for (List<String> partition : partitionsForAdd) {
-                addPartition(conf, storage, partition, partitions.get(partition));
-            }
-
-            for (List<String> partition : partitionsForUpdate) {
-                updatePartition(conf, storage, partition, partitions.get(partition));
-            }
-        } catch(IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
-    private void updatePartition(Configuration conf, CatalogStorage storage, List<String> partition, String location)
-        throws FalconException {
-        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
-        catalogService.updatePartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(),
-                partition, location);
-    }
-
-    private void addPartition(Configuration conf, CatalogStorage storage, List<String> partition, String location)
-        throws FalconException {
-        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
-        catalogService.addPartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partition,
-                location);
-    }
-
-    private List<List<String>> listPartitions(Configuration conf, CatalogStorage storage, List<String> staticPartitions)
-        throws FalconException {
-        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
-        List<CatalogPartition> partitions = catalogService.listPartitions(conf, storage.getCatalogUrl(),
-                storage.getDatabase(), storage.getTable(), staticPartitions);
-        List<List<String>> existPartitions = new ArrayList<List<String>>();
-        for (CatalogPartition partition : partitions) {
-            existPartitions.add(partition.getValues());
-        }
-        return existPartitions;
-    }
-
-    //Returns the dynamic partitions of the data path
-    protected List<String> getDynamicPartitions(Path path, Path staticPath) {
-        String dynPart = path.toUri().getPath().substring(staticPath.toString().length());
-        dynPart = StringUtils.removeStart(dynPart, "/");
-        dynPart = StringUtils.removeEnd(dynPart, "/");
-        if (StringUtils.isEmpty(dynPart)) {
-            return new ArrayList<String>();
-        }
-        return Arrays.asList(dynPart.split("/"));
-    }
-
-    private List<String> getPartitionColumns(Configuration conf, CatalogStorage storage) throws FalconException {
-        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
-        return catalogService.getPartitionColumns(conf, storage.getCatalogUrl(), storage.getDatabase(),
-                storage.getTable());
-    }
-
-    private void dropPartitions(Configuration conf, CatalogStorage storage, List<String> values)
-        throws FalconException {
-        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
-        catalogService.dropPartitions(conf, storage.getCatalogUrl(), storage.getDatabase(),
-                storage.getTable(), values, false);
-    }
-
-    //Get the catalog template from feed properties as feed is filesystem based
-    protected CatalogStorage getCatalogStorageFromFeedProperties(Feed feed, Cluster cluster, Configuration conf)
-        throws FalconException {
-        Properties properties = FeedHelper.getFeedProperties(feed);
-        String tableUri = properties.getProperty(CATALOG_TABLE);
-        if (tableUri == null) {
-            return null;
-        }
-
-        CatalogTable table = new CatalogTable();
-        table.setUri(tableUri.replace("{", "${"));
-        CatalogStorage storage = null;
-        try {
-            storage = new CatalogStorage(cluster, table);
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
-        }
-
-        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
-        if (!catalogService.tableExists(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable())) {
-            return null;
-        }
-        return storage;
-    }
-
-    @Override
-    public void onFailure(WorkflowExecutionContext context) throws FalconException {
-        //no-op
-    }
-
-    @Override
-    public void onStart(WorkflowExecutionContext context) throws FalconException {
-        // Do nothing
-    }
-
-    @Override
-    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
-        // Do nothing
-    }
-
-    @Override
-    public void onWait(WorkflowExecutionContext context) throws FalconException {
-        // Do nothing
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
deleted file mode 100644
index 77e6851..0000000
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory for providing appropriate catalog service
- * implementation to the falcon service.
- */
-@SuppressWarnings("unchecked")
-public final class CatalogServiceFactory {
-    private static final Logger LOG = LoggerFactory.getLogger(CatalogServiceFactory.class);
-
-    public static final String CATALOG_SERVICE = "catalog.service.impl";
-
-    private CatalogServiceFactory() {
-    }
-
-    public static boolean isEnabled() {
-        boolean isEnabled = StartupProperties.get().containsKey(CATALOG_SERVICE);
-        if (!isEnabled) {
-            LOG.info("Catalog service disabled. Partitions will not registered");
-        }
-        return isEnabled;
-    }
-
-    public static AbstractCatalogService getCatalogService() throws FalconException {
-        if (!isEnabled()) {
-            throw new FalconException(
-                "Catalog integration is not enabled in falcon. Implementation is missing: " + CATALOG_SERVICE);
-        }
-
-        return ReflectionUtils.getInstance(CATALOG_SERVICE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
deleted file mode 100644
index 872f91f..0000000
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * An implementation of CatalogService that uses Hive Meta Store (HCatalog)
- * as the backing Catalog registry.
- */
-public class HiveCatalogService extends AbstractCatalogService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
-    public static final String CREATE_TIME = "falcon.create_time";
-    public static final String UPDATE_TIME = "falcon.update_time";
-    public static final String PARTITION_DOES_NOT_EXIST = "Partition does not exist";
-
-
-    public static HiveConf createHiveConf(Configuration conf,
-                                           String metastoreUrl) throws IOException {
-        HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
-
-        hcatConf.set("hive.metastore.local", "false");
-        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
-        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
-        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
-                HCatSemanticAnalyzer.class.getName());
-        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-
-        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-        return hcatConf;
-    }
-
-    /**
-     * This is used from with in an oozie job.
-     *
-     * @param conf conf object
-     * @param metastoreUrl metastore uri
-     * @return hive metastore client handle
-     * @throws FalconException
-     */
-    private static HiveMetaStoreClient createClient(Configuration conf,
-                                                    String metastoreUrl) throws FalconException {
-        try {
-            LOG.info("Creating HCatalog client object for metastore {} using conf {}",
-                metastoreUrl, conf.toString());
-            final Credentials credentials = getCredentials(conf);
-            Configuration jobConf = credentials != null ? copyCredentialsToConf(conf, credentials) : conf;
-            HiveConf hcatConf = createHiveConf(jobConf, metastoreUrl);
-
-            if (UserGroupInformation.isSecurityEnabled()) {
-                hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
-                    conf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
-                hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
-
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-                ugi.addCredentials(credentials); // credentials cannot be null
-            }
-
-            return new HiveMetaStoreClient(hcatConf);
-        } catch (Exception e) {
-            throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e);
-        }
-    }
-
-    private static JobConf copyCredentialsToConf(Configuration conf, Credentials credentials) {
-        JobConf jobConf = new JobConf(conf);
-        jobConf.setCredentials(credentials);
-        return jobConf;
-    }
-
-    private static Credentials getCredentials(Configuration conf) throws IOException {
-        final String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
-        if (tokenFile == null) {
-            return null;
-        }
-
-        try {
-            LOG.info("Adding credentials/delegation tokens from token file={} to conf", tokenFile);
-            Credentials credentials = Credentials.readTokenStorageFile(new File(tokenFile), conf);
-            LOG.info("credentials numberOfTokens={}, numberOfSecretKeys={}",
-                credentials.numberOfTokens(), credentials.numberOfSecretKeys());
-            return credentials;
-        } catch (IOException e) {
-            LOG.warn("error while fetching credentials from {}", tokenFile);
-        }
-
-        return null;
-    }
-
-    /**
-     * This is used from with in falcon namespace.
-     *
-     * @param conf                      conf
-     * @param catalogUrl                metastore uri
-     * @return hive metastore client handle
-     * @throws FalconException
-     */
-    private static HiveMetaStoreClient createProxiedClient(Configuration conf,
-                                                           String catalogUrl) throws FalconException {
-
-        try {
-            final HiveConf hcatConf = createHiveConf(conf, catalogUrl);
-            UserGroupInformation proxyUGI = CurrentUser.getProxyUGI();
-            addSecureCredentialsAndToken(conf, hcatConf, proxyUGI);
-
-            LOG.info("Creating HCatalog client object for {}", catalogUrl);
-            return proxyUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
-                public HiveMetaStoreClient run() throws Exception {
-                    return new HiveMetaStoreClient(hcatConf);
-                }
-            });
-        } catch (Exception e) {
-            throw new FalconException("Exception creating Proxied HiveMetaStoreClient: " + e.getMessage(), e);
-        }
-    }
-
-    private static void addSecureCredentialsAndToken(Configuration conf,
-                                                     HiveConf hcatConf,
-                                                     UserGroupInformation proxyUGI) throws IOException {
-        if (UserGroupInformation.isSecurityEnabled()) {
-            String metaStoreServicePrincipal = conf.get(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
-            hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
-                metaStoreServicePrincipal);
-            hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
-
-            Token<DelegationTokenIdentifier> delegationTokenId = getDelegationToken(
-                hcatConf, metaStoreServicePrincipal);
-            proxyUGI.addToken(delegationTokenId);
-        }
-    }
-
-    private static Token<DelegationTokenIdentifier> getDelegationToken(HiveConf hcatConf,
-                                                                       String metaStoreServicePrincipal)
-        throws IOException {
-
-        LOG.debug("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
-        HCatClient hcatClient = HCatClient.create(hcatConf);
-        String delegationToken = hcatClient.getDelegationToken(
-                CurrentUser.getUser(), metaStoreServicePrincipal);
-        hcatConf.set("hive.metastore.token.signature", "FalconService");
-
-        Token<DelegationTokenIdentifier> delegationTokenId = new Token<DelegationTokenIdentifier>();
-        delegationTokenId.decodeFromUrlString(delegationToken);
-        delegationTokenId.setService(new Text("FalconService"));
-        LOG.info("Created delegation token={}", delegationToken);
-        return delegationTokenId;
-    }
-
-    @Override
-    public boolean isAlive(Configuration conf, final String catalogUrl) throws FalconException {
-        LOG.info("Checking if the service is alive for: {}", catalogUrl);
-
-        try {
-            HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
-            Database database = client.getDatabase("default");
-            return database != null;
-        } catch (Exception e) {
-            throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public boolean tableExists(Configuration conf, final String catalogUrl, final String database,
-                               final String tableName) throws FalconException {
-        LOG.info("Checking if the table exists: {}", tableName);
-
-        try {
-            HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
-            Table table = client.getTable(database, tableName);
-            return table != null;
-        } catch (NoSuchObjectException e) {
-            return false;
-        } catch (Exception e) {
-            throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public boolean isTableExternal(Configuration conf, String catalogUrl, String database,
-                                   String tableName) throws FalconException {
-        LOG.info("Checking if the table is external: {}", tableName);
-
-        try {
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            Table table = client.getTable(database, tableName);
-            return table.getTableType().equals(TableType.EXTERNAL_TABLE.name());
-        } catch (Exception e) {
-            throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl,
-                                                         String database, String tableName,
-                                                         List<String> values) throws FalconException {
-        LOG.info("List partitions for: {}, partition filter: {}", tableName, values);
-
-        try {
-            List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
-
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            List<Partition> hCatPartitions = client.listPartitions(database, tableName, values, (short) -1);
-            for (Partition hCatPartition : hCatPartitions) {
-                LOG.debug("Partition: " + hCatPartition.getValues());
-                CatalogPartition partition = createCatalogPartition(hCatPartition);
-                catalogPartitionList.add(partition);
-            }
-
-            return catalogPartitionList;
-        } catch (Exception e) {
-            throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public List<CatalogPartition> listPartitionsByFilter(Configuration conf, String catalogUrl,
-                                                         String database, String tableName,
-                                                         String filter) throws FalconException {
-        LOG.info("List partitions for: {}, partition filter: {}", tableName, filter);
-
-        try {
-            List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
-
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            List<Partition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter, (short) -1);
-            for (Partition hCatPartition : hCatPartitions) {
-                LOG.info("Partition: " + hCatPartition.getValues());
-                CatalogPartition partition = createCatalogPartition(hCatPartition);
-                catalogPartitionList.add(partition);
-            }
-
-            return catalogPartitionList;
-        } catch (Exception e) {
-            throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
-        }
-    }
-
-    private CatalogPartition createCatalogPartition(Partition hCatPartition) {
-        final CatalogPartition catalogPartition = new CatalogPartition();
-        catalogPartition.setDatabaseName(hCatPartition.getDbName());
-        catalogPartition.setTableName(hCatPartition.getTableName());
-        catalogPartition.setValues(hCatPartition.getValues());
-        catalogPartition.setInputFormat(hCatPartition.getSd().getInputFormat());
-        catalogPartition.setOutputFormat(hCatPartition.getSd().getOutputFormat());
-        catalogPartition.setLocation(hCatPartition.getSd().getLocation());
-        catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib());
-        catalogPartition.setCreateTime(hCatPartition.getCreateTime());
-        catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
-        Map<String, String> params = hCatPartition.getParameters();
-        if (params != null) {
-            String size = hCatPartition.getParameters().get("totalSize");
-            if (StringUtils.isNotBlank(size)) {
-                catalogPartition.setSize(Long.parseLong(size));
-            }
-        }
-        return catalogPartition;
-    }
-
-    //Drop single partition
-    @Override
-    public boolean dropPartition(Configuration conf, String catalogUrl,
-                                  String database, String tableName,
-                                  List<String> partitionValues, boolean deleteData) throws FalconException {
-        LOG.info("Dropping partition for: {}, partition: {}", tableName, partitionValues);
-
-        try {
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            return client.dropPartition(database, tableName, partitionValues, deleteData);
-        } catch (Exception e) {
-            throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void dropPartitions(Configuration conf, String catalogUrl,
-                               String database, String tableName,
-                               List<String> partitionValues, boolean deleteData) throws FalconException {
-        LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitionValues);
-
-        try {
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            List<Partition> partitions = client.listPartitions(database, tableName, partitionValues, (short) -1);
-            for (Partition part : partitions) {
-                LOG.info("Dropping partition for: {}, partition: {}", tableName, part.getValues());
-                client.dropPartition(database, tableName, part.getValues(), deleteData);
-            }
-        } catch (Exception e) {
-            throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public CatalogPartition getPartition(Configuration conf, String catalogUrl,
-                                         String database, String tableName,
-                                         List<String> partitionValues) throws FalconException {
-        LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionValues);
-
-        try {
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            Partition hCatPartition = client.getPartition(database, tableName, partitionValues);
-            return createCatalogPartition(hCatPartition);
-        } catch (NoSuchObjectException nsoe) {
-            throw new FalconException(PARTITION_DOES_NOT_EXIST + ":" + nsoe.getMessage(), nsoe);
-        } catch (Exception e) {
-            throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database,
-                                            String tableName) throws FalconException {
-        LOG.info("Fetching partition columns of table: " + tableName);
-
-        try {
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            Table table = client.getTable(database, tableName);
-            List<String> partCols = new ArrayList<String>();
-            for (FieldSchema part : table.getPartitionKeys()) {
-                partCols.add(part.getName());
-            }
-            return partCols;
-        } catch (Exception e) {
-            throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void addPartition(Configuration conf, String catalogUrl, String database,
-                             String tableName, List<String> partValues, String location) throws FalconException {
-        LOG.info("Adding partition {} for {}.{} with location {}", partValues, database, tableName, location);
-
-        try {
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            Table table = client.getTable(database, tableName);
-            org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
-            part.setDbName(database);
-            part.setTableName(tableName);
-            part.setValues(partValues);
-            part.setSd(table.getSd());
-            part.getSd().setLocation(location);
-            part.setParameters(table.getParameters());
-            if (part.getParameters() == null) {
-                part.setParameters(new HashMap<String, String>());
-            }
-            part.getParameters().put(CREATE_TIME, String.valueOf(System.currentTimeMillis()));
-            client.add_partition(part);
-
-        } catch (Exception e) {
-            throw new FalconException("Exception adding partition: " + e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void updatePartition(Configuration conf, String catalogUrl, String database,
-                             String tableName, List<String> partValues, String location) throws FalconException {
-        LOG.info("Updating partition {} of {}.{} with location {}", partValues, database, tableName, location);
-
-        try {
-            HiveMetaStoreClient client = createClient(conf, catalogUrl);
-            Table table = client.getTable(database, tableName);
-            org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
-            part.setDbName(database);
-            part.setTableName(tableName);
-            part.setValues(partValues);
-            part.setSd(table.getSd());
-            part.getSd().setLocation(location);
-            part.setParameters(table.getParameters());
-            if (part.getParameters() == null) {
-                part.setParameters(new HashMap<String, String>());
-            }
-            part.getParameters().put(UPDATE_TIME, String.valueOf(System.currentTimeMillis()));
-            client.alter_partition(database, tableName, part);
-        } catch (Exception e) {
-            throw new FalconException("Exception updating partition: " + e.getMessage(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
deleted file mode 100644
index 85d7263..0000000
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.cleanup;
-
-import org.apache.commons.el.ExpressionEvaluatorImpl;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-import java.io.IOException;
-
-/**
- * Falcon cleanup handler for cleaning up work, temp and log files
- * left behind by falcon.
- */
-public abstract class AbstractCleanupHandler {
-
-    protected static final Logger LOG = LoggerFactory.getLogger(AbstractCleanupHandler.class);
-
-    protected static final ConfigurationStore STORE = ConfigurationStore.get();
-    public static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
-    public static final ExpressionHelper RESOLVER = ExpressionHelper.get();
-
-    protected long getRetention(Entity entity, TimeUnit timeUnit)
-        throws FalconException {
-
-        String retention = getRetentionValue(timeUnit);
-        try {
-            return (Long) EVALUATOR.evaluate("${" + retention + "}",
-                    Long.class, RESOLVER, RESOLVER);
-        } catch (ELException e) {
-            throw new FalconException("Unable to evalue retention limit: "
-                    + retention + " for entity: " + entity.getName());
-        }
-    }
-
-    private String getRetentionValue(Frequency.TimeUnit timeunit) {
-        String defaultValue;
-        switch (timeunit) {
-        case minutes:
-            defaultValue = "hours(24)";
-            break;
-
-        case hours:
-            defaultValue = "days(3)";
-            break;
-
-        case days:
-            defaultValue = "days(12)";
-            break;
-
-        case months:
-            defaultValue = "months(3)";
-            break;
-
-        default:
-            defaultValue = "days(1)";
-        }
-        return RuntimeProperties.get().getProperty("log.cleanup.frequency." + timeunit + ".retention", defaultValue);
-    }
-
-    protected FileStatus[] getAllLogs(FileSystem fs, Cluster cluster,
-                                      Entity entity) throws FalconException {
-        FileStatus[] paths;
-        try {
-            Path logPath = getLogPath(cluster, entity);
-            paths = fs.globStatus(logPath);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-
-        return paths;
-    }
-
-    private Path getLogPath(Cluster cluster, Entity entity) {
-        // logsPath = base log path + relative path
-        return new Path(EntityUtil.getLogPath(cluster, entity), getRelativeLogPath());
-    }
-
-    private FileSystem getFileSystemAsEntityOwner(Cluster cluster,
-                                                  Entity entity) throws FalconException {
-        try {
-            final AccessControlList acl = entity.getACL();
-            // To support backward compatibility, will only use the ACL owner only if present
-            if (acl != null) {
-                CurrentUser.authenticate(acl.getOwner()); // proxy user
-            }
-
-            return HadoopClientFactory.get().createProxiedFileSystem(
-                    ClusterHelper.getConfiguration(cluster));
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
-    protected void delete(String clusterName, Entity entity, long retention) throws FalconException {
-        Cluster currentCluster = STORE.get(EntityType.CLUSTER, clusterName);
-        if (!isClusterInCurrentColo(currentCluster.getColo())) {
-            LOG.info("Ignoring cleanup for {}: {} in cluster: {} as this does not belong to current colo",
-                    entity.getEntityType(), entity.getName(), clusterName);
-            return;
-        }
-
-        LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}",
-                entity.getEntityType(), entity.getName(), clusterName, retention);
-
-        FileSystem fs = getFileSystemAsEntityOwner(currentCluster, entity);
-        FileStatus[] logs = getAllLogs(fs, currentCluster, entity);
-        deleteInternal(fs, currentCluster, entity, retention, logs);
-    }
-
-    private void deleteInternal(FileSystem fs, Cluster cluster, Entity entity,
-                                long retention, FileStatus[] logs) throws FalconException {
-        if (logs == null || logs.length == 0) {
-            LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(),
-                    entity.getName());
-            return;
-        }
-
-        long now = System.currentTimeMillis();
-
-        for (FileStatus log : logs) {
-            if (now - log.getModificationTime() > retention) {
-                try {
-                    boolean isDeleted = fs.delete(log.getPath(), true);
-                    LOG.error(isDeleted ? "Deleted path: {}" : "Unable to delete path: {}",
-                            log.getPath());
-                    deleteParentIfEmpty(fs, log.getPath().getParent());
-                } catch (IOException e) {
-                    throw new FalconException(" Unable to delete log file : "
-                            + log.getPath() + " for entity " + entity.getName()
-                            + " for cluster: " + cluster.getName(), e);
-                }
-            } else {
-                LOG.info("Retention limit: {} is less than modification {} for path: {}", retention,
-                        (now - log.getModificationTime()), log.getPath());
-            }
-        }
-    }
-
-    private void deleteParentIfEmpty(FileSystem fs, Path parent) throws IOException {
-        FileStatus[] files = fs.listStatus(parent);
-        if (files != null && files.length == 0) {
-            LOG.info("Parent path: {} is empty, deleting path", parent);
-            fs.delete(parent, true);
-            deleteParentIfEmpty(fs, parent.getParent());
-        }
-    }
-
-    public abstract void cleanup() throws FalconException;
-
-    protected abstract String getRelativeLogPath();
-
-    protected boolean isClusterInCurrentColo(String colo) {
-        final String currentColo = StartupProperties.get().getProperty("current.colo", "default");
-        return DeploymentUtil.isEmbeddedMode() || currentColo.equals(colo);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
deleted file mode 100644
index 16db7d8..0000000
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.cleanup;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
-
-import java.util.Collection;
-
-/**
- * Cleanup files relating to feed management workflows.
- */
-public class FeedCleanupHandler extends AbstractCleanupHandler {
-
-    @Override
-    public void cleanup() throws FalconException {
-        Collection<String> feeds = STORE.getEntities(EntityType.FEED);
-        for (String feedName : feeds) {
-            Feed feed = STORE.get(EntityType.FEED, feedName);
-            long retention = getRetention(feed, feed.getFrequency().getTimeUnit());
-
-            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
-                delete(cluster.getName(), feed, retention);
-            }
-        }
-    }
-
-    @Override
-    protected String getRelativeLogPath() {
-        return "job-*/*/*";
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
deleted file mode 100644
index 00281f9..0000000
--- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.cleanup;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.Process;
-
-import java.util.Collection;
-
-/**
- * Handler to cleanup files left behind by falcon relating to process.
- */
-public class ProcessCleanupHandler extends AbstractCleanupHandler {
-
-    @Override
-    public void cleanup() throws FalconException {
-        Collection<String> processes = STORE.getEntities(EntityType.PROCESS);
-        for (String processName : processes) {
-            Process process = STORE.get(EntityType.PROCESS, processName);
-            long retention = getRetention(process, process.getFrequency().getTimeUnit());
-
-            for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
-                delete(cluster.getName(), process, retention);
-            }
-        }
-    }
-
-    @Override
-    protected String getRelativeLogPath() {
-        return "job-*/*";
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
deleted file mode 100644
index c5860c9..0000000
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ /dev/null
@@ -1,592 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-import org.apache.falcon.catalog.AbstractCatalogService;
-import org.apache.falcon.catalog.CatalogPartition;
-import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.catalog.HiveCatalogService;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.retention.EvictedInstanceSerDe;
-import org.apache.falcon.retention.EvictionHelper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.jsp.el.ELException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-
-/**
- * A catalog registry implementation of a feed storage.
- */
-public class CatalogStorage extends Configured implements Storage {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
-
-    // constants to be used while preparing HCatalog partition filter query
-    private static final String FILTER_ST_BRACKET = "(";
-    private static final String FILTER_END_BRACKET = ")";
-    private static final String FILTER_QUOTE = "'";
-    private static final String FILTER_AND = " and ";
-    private static final String FILTER_OR = " or ";
-    private static final String FILTER_LESS_THAN = " < ";
-    private static final String FILTER_EQUALS = " = ";
-
-    private final StringBuffer instancePaths = new StringBuffer();
-    private final StringBuilder instanceDates = new StringBuilder();
-
-    public static final String PARTITION_SEPARATOR = ";";
-    public static final String PARTITION_KEYVAL_SEPARATOR = "=";
-    public static final String INPUT_PATH_SEPARATOR = ":";
-    public static final String OUTPUT_PATH_SEPARATOR = "/";
-    public static final String PARTITION_VALUE_QUOTE = "'";
-
-    public static final String CATALOG_URL = "${hcatNode}";
-
-    private final String catalogUrl;
-    private String database;
-    private String table;
-    private Map<String, String> partitions;
-
-    protected CatalogStorage(Feed feed) throws URISyntaxException {
-        this(CATALOG_URL, feed.getTable());
-    }
-
-    public CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
-        this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table);
-    }
-
-    protected CatalogStorage(String catalogUrl, CatalogTable table) throws URISyntaxException {
-        this(catalogUrl, table.getUri());
-    }
-
-    protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
-        if (catalogUrl == null || catalogUrl.length() == 0) {
-            throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
-        }
-
-        this.catalogUrl = catalogUrl;
-
-        parseFeedUri(tableUri);
-    }
-
-    /**
-     * Validate URI to conform to catalog:$database:$table#$partitions.
-     * scheme=catalog:database=$database:table=$table#$partitions
-     * partitions=key=value;key=value
-     *
-     * @param catalogTableUri table URI to parse and validate
-     * @throws URISyntaxException
-     */
-    private void parseFeedUri(String catalogTableUri) throws URISyntaxException {
-
-        final String processed = catalogTableUri.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
-                                                .replaceAll("}", EXPR_CLOSE_NORMALIZED);
-        URI tableUri = new URI(processed);
-
-        if (!"catalog".equals(tableUri.getScheme())) {
-            throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
-        }
-
-        final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
-        if (schemeSpecificPart == null) {
-            throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
-        }
-
-        String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
-
-        if (paths.length != 2) {
-            throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
-        }
-
-        database = paths[0];
-        table = paths[1];
-
-        if (database == null || database.length() == 0) {
-            throw new URISyntaxException(tableUri.toString(), "DB name is missing");
-        }
-        if (table == null || table.length() == 0) {
-            throw new URISyntaxException(tableUri.toString(), "Table name is missing");
-        }
-
-        String partRaw = tableUri.getFragment();
-        if (partRaw == null || partRaw.length() == 0) {
-            throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
-        }
-
-        final String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
-                                           .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
-        partitions = new LinkedHashMap<String, String>(); // preserve insertion order
-        String[] parts = rawPartition.split(PARTITION_SEPARATOR);
-        for (String part : parts) {
-            if (part == null || part.length() == 0) {
-                continue;
-            }
-
-            String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
-            if (keyVal.length != 2) {
-                throw new URISyntaxException(tableUri.toString(),
-                        "Partition key value pair is not specified properly in (" + part + ")");
-            }
-
-            partitions.put(keyVal[0], keyVal[1]);
-        }
-    }
-
-    /**
-     * Create an instance from the URI Template that was generated using
-     * the getUriTemplate() method.
-     *
-     * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
-     * @throws URISyntaxException
-     */
-    protected CatalogStorage(String uriTemplate) throws URISyntaxException {
-        if (uriTemplate == null || uriTemplate.length() == 0) {
-            throw new IllegalArgumentException("URI template cannot be null or empty");
-        }
-
-        final String processed = uriTemplate.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
-                                            .replaceAll("}", EXPR_CLOSE_NORMALIZED);
-        URI uri = new URI(processed);
-
-        this.catalogUrl = uri.getScheme() + "://" + uri.getAuthority();
-
-        parseUriTemplate(uri);
-    }
-
-    protected CatalogStorage(String uriTemplate, Configuration conf) throws URISyntaxException {
-        this(uriTemplate);
-        setConf(conf);
-    }
-
-    private void parseUriTemplate(URI uriTemplate) throws URISyntaxException {
-        String path = uriTemplate.getPath();
-        String[] paths = path.split(OUTPUT_PATH_SEPARATOR);
-        if (paths.length != 4) {
-            throw new URISyntaxException(uriTemplate.toString(),
-                    "URI path is not in expected format: database:table");
-        }
-
-        database = paths[1];
-        table = paths[2];
-        String partRaw = paths[3];
-
-        if (database == null || database.length() == 0) {
-            throw new URISyntaxException(uriTemplate.toString(), "DB name is missing");
-        }
-        if (table == null || table.length() == 0) {
-            throw new URISyntaxException(uriTemplate.toString(), "Table name is missing");
-        }
-        if (partRaw == null || partRaw.length() == 0) {
-            throw new URISyntaxException(uriTemplate.toString(), "Partition details are missing");
-        }
-
-        String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
-                .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
-        partitions = new LinkedHashMap<String, String>();
-        String[] parts = rawPartition.split(PARTITION_SEPARATOR);
-        for (String part : parts) {
-            if (part == null || part.length() == 0) {
-                continue;
-            }
-
-            String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
-            if (keyVal.length != 2) {
-                throw new URISyntaxException(uriTemplate.toString(),
-                        "Partition key value pair is not specified properly in (" + part + ")");
-            }
-
-            partitions.put(keyVal[0], keyVal[1]);
-        }
-    }
-
-    public String getCatalogUrl() {
-        return catalogUrl;
-    }
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public Map<String, String> getPartitions() {
-        return partitions;
-    }
-
-    /**
-     * @param key partition key
-     * @return partition value
-     */
-    public String getPartitionValue(String key) {
-        return partitions.get(key);
-    }
-
-    /**
-     * @param key partition key
-     * @return if partitions map includes the key or not
-     */
-    public boolean hasPartition(String key) {
-        return partitions.containsKey(key);
-    }
-
-    public List<String> getDatedPartitionKeys() {
-        List<String> keys = new ArrayList<String>();
-
-        for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
-
-            Matcher matcher = FeedDataPath.PATTERN.matcher(entry.getValue());
-            if (matcher.find()) {
-                keys.add(entry.getKey());
-            }
-        }
-
-        return keys;
-    }
-
-    /**
-     * Convert the partition map to filter string.
-     * Each key value pair is separated by ';'.
-     *
-     * @return filter string
-     */
-    public String toPartitionFilter() {
-        StringBuilder filter = new StringBuilder();
-        filter.append("(");
-        for (Map.Entry<String, String> entry : partitions.entrySet()) {
-            if (filter.length() > 1) {
-                filter.append(PARTITION_SEPARATOR);
-            }
-            filter.append(entry.getKey());
-            filter.append(PARTITION_KEYVAL_SEPARATOR);
-            filter.append(PARTITION_VALUE_QUOTE);
-            filter.append(entry.getValue());
-            filter.append(PARTITION_VALUE_QUOTE);
-        }
-        filter.append(")");
-        return filter.toString();
-    }
-
-    /**
-     * Convert the partition map to path string.
-     * Each key value pair is separated by '/'.
-     *
-     * @return path string
-     */
-    public String toPartitionAsPath() {
-        StringBuilder partitionFilter = new StringBuilder();
-
-        for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
-            partitionFilter.append(entry.getKey())
-                    .append(PARTITION_KEYVAL_SEPARATOR)
-                    .append(entry.getValue())
-                    .append(OUTPUT_PATH_SEPARATOR);
-        }
-
-        partitionFilter.setLength(partitionFilter.length() - 1);
-        return partitionFilter.toString();
-    }
-
-    @Override
-    public TYPE getType() {
-        return TYPE.TABLE;
-    }
-
-    /**
-     * LocationType does NOT matter here.
-     */
-    @Override
-    public String getUriTemplate() {
-        return getUriTemplate(LocationType.DATA);
-    }
-
-    /**
-     * LocationType does NOT matter here.
-     */
-    @Override
-    public String getUriTemplate(LocationType locationType) {
-        StringBuilder uriTemplate = new StringBuilder();
-        uriTemplate.append(catalogUrl);
-        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
-        uriTemplate.append(database);
-        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
-        uriTemplate.append(table);
-        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
-        for (Map.Entry<String, String> entry : partitions.entrySet()) {
-            uriTemplate.append(entry.getKey());
-            uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
-            uriTemplate.append(entry.getValue());
-            uriTemplate.append(PARTITION_SEPARATOR);
-        }
-        uriTemplate.setLength(uriTemplate.length() - 1);
-
-        return uriTemplate.toString();
-    }
-
-    @Override
-    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
-        if (!(toCompareAgainst instanceof CatalogStorage)) {
-            return false;
-        }
-
-        CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
-
-        return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
-                && getDatabase().equals(catalogStorage.getDatabase())
-                && getTable().equals(catalogStorage.getTable())
-                && getPartitions().equals(catalogStorage.getPartitions());
-    }
-
-    @Override
-    public void validateACL(AccessControlList acl) throws FalconException {
-        // This is not supported in Hive today as authorization is not enforced on table and
-        // partition listing
-    }
-
-    @Override
-    public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType,
-                                               Date start, Date end) throws FalconException {
-        try {
-            List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>();
-            Date feedStart = FeedHelper.getFeedValidityStart(feed, clusterName);
-            Date alignedDate = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(),
-                    feed.getTimezone(), start);
-
-            while (!end.before(alignedDate)) {
-                List<String> partitionValues = getCatalogPartitionValues(alignedDate);
-                try {
-                    CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
-                            getConf(), getCatalogUrl(), getDatabase(), getTable(), partitionValues);
-                    instances.add(getFeedInstanceFromCatalogPartition(partition));
-                } catch (FalconException e) {
-                    if (e.getMessage().startsWith(HiveCatalogService.PARTITION_DOES_NOT_EXIST)) {
-                        // Partition missing
-                        FeedInstanceStatus instanceStatus = new FeedInstanceStatus(null);
-                        instanceStatus.setInstance(partitionValues.toString());
-                        instances.add(instanceStatus);
-                    } else {
-                        throw e;
-                    }
-                }
-                alignedDate = FeedHelper.getNextFeedInstanceDate(alignedDate, feed);
-            }
-            return instances;
-        } catch (Exception e) {
-            LOG.error("Unable to retrieve listing for {}:{} -- {}", locationType, catalogUrl, e.getMessage());
-            throw new FalconException("Unable to retrieve listing for (URI " + catalogUrl + ")", e);
-        }
-    }
-
-    private List<String> getCatalogPartitionValues(Date alignedDate) throws FalconException {
-        List<String> partitionValues  = new ArrayList<String>();
-        for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
-            if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
-                ExpressionHelper.setReferenceDate(alignedDate);
-                ExpressionHelper expressionHelper = ExpressionHelper.get();
-                String instanceValue = expressionHelper.evaluateFullExpression(entry.getValue(), String.class);
-                partitionValues.add(instanceValue);
-            } else {
-                partitionValues.add(entry.getValue());
-            }
-        }
-        return partitionValues;
-    }
-
-    private FeedInstanceStatus getFeedInstanceFromCatalogPartition(CatalogPartition partition) {
-        FeedInstanceStatus feedInstanceStatus = new FeedInstanceStatus(partition.getLocation());
-        feedInstanceStatus.setCreationTime(partition.getCreateTime());
-        feedInstanceStatus.setInstance(partition.getValues().toString());
-        FeedInstanceStatus.AvailabilityStatus availabilityStatus = FeedInstanceStatus.AvailabilityStatus.MISSING;
-        long size = partition.getSize();
-        if (size == 0) {
-            availabilityStatus = FeedInstanceStatus.AvailabilityStatus.EMPTY;
-        } else if (size > 0) {
-            availabilityStatus = FeedInstanceStatus.AvailabilityStatus.AVAILABLE;
-        }
-        feedInstanceStatus.setSize(size);
-        feedInstanceStatus.setStatus(availabilityStatus);
-        return feedInstanceStatus;
-    }
-
-    @Override
-    public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName,
-                                         LocationType locationType, Date instanceTime) throws FalconException {
-        List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime);
-        if (result.isEmpty()) {
-            return FeedInstanceStatus.AvailabilityStatus.MISSING;
-        } else {
-            return result.get(0).getStatus();
-        }
-    }
-
-    @Override
-    public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException {
-        LOG.info("Applying retention on {}, Limit: {}, timezone: {}",
-                getTable(), retentionLimit, timeZone);
-
-        List<CatalogPartition> toBeDeleted;
-        try {
-            // get sorted date partition keys and values
-            toBeDeleted = discoverPartitionsToDelete(retentionLimit, timeZone);
-        } catch (ELException e) {
-            throw new FalconException("Couldn't find partitions to be deleted", e);
-
-        }
-
-        if (toBeDeleted.isEmpty()) {
-            LOG.info("No partitions to delete.");
-        } else {
-            final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
-                getConf(), getCatalogUrl(), getDatabase(), getTable());
-            try {
-                dropPartitions(toBeDeleted, isTableExternal);
-            } catch (IOException e) {
-                throw new FalconException("Couldn't drop partitions", e);
-            }
-        }
-
-        try {
-            EvictedInstanceSerDe.serializeEvictedInstancePaths(
-                    HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), new Configuration()),
-                    logFilePath, instancePaths);
-        } catch (IOException e) {
-            throw new FalconException("Couldn't record dropped partitions", e);
-        }
-        return instanceDates;
-    }
-
-    private List<CatalogPartition> discoverPartitionsToDelete(String retentionLimit, String timezone)
-        throws FalconException, ELException {
-        Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
-        ExpressionHelper.setReferenceDate(range.first);
-        Map<String, String> partitionsToDelete = new LinkedHashMap<String, String>();
-        ExpressionHelper expressionHelper = ExpressionHelper.get();
-        for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
-            if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
-                partitionsToDelete.put(entry.getKey(),
-                        expressionHelper.evaluateFullExpression(entry.getValue(), String.class));
-            }
-        }
-        final String filter = createFilter(partitionsToDelete);
-        return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
-            getConf(), getCatalogUrl(), getDatabase(), getTable(), filter);
-    }
-
-    /**
-     * Creates hive partition filter from inputs partition map.
-     * @param partitionsMap - ordered map of partition keys and values
-     * @return partition filter
-     * @throws ELException
-     */
-    private String createFilter(Map<String, String> partitionsMap) throws ELException {
-
-        /* Construct filter query string. As an example, suppose the dated partition keys
-         * are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10].
-         * Then the filter query generated is of the format:
-         * "(year < '2014') or (year = '2014' and month < '02') or
-         * (year = '2014' and month = '02' and day < '24') or
-         * or (year = '2014' and month = '02' and day = '24' and hour < '10')"
-         */
-        StringBuilder filterBuffer = new StringBuilder();
-        List<String> keys = new ArrayList<String>(partitionsMap.keySet());
-        for (int curr = 0; curr < partitionsMap.size(); curr++) {
-            if (curr > 0) {
-                filterBuffer.append(FILTER_OR);
-            }
-            filterBuffer.append(FILTER_ST_BRACKET);
-            for (int prev = 0; prev < curr; prev++) {
-                String key = keys.get(prev);
-                filterBuffer.append(key)
-                        .append(FILTER_EQUALS)
-                        .append(FILTER_QUOTE)
-                        .append(partitionsMap.get(key))
-                        .append(FILTER_QUOTE)
-                        .append(FILTER_AND);
-            }
-            String key = keys.get(curr);
-            filterBuffer.append(key)
-                    .append(FILTER_LESS_THAN)
-                    .append(FILTER_QUOTE)
-                    .append(partitionsMap.get(key))
-                    .append(FILTER_QUOTE)
-                    .append(FILTER_END_BRACKET);
-        }
-
-        return filterBuffer.toString();
-    }
-
-    private void dropPartitions(List<CatalogPartition> partitionsToDelete, boolean isTableExternal)
-        throws FalconException, IOException {
-        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
-        for (CatalogPartition partition : partitionsToDelete) {
-            boolean deleted = catalogService.dropPartition(getConf(), getCatalogUrl(), getDatabase(), getTable(),
-                    partition.getValues(), true);
-
-            if (!deleted) {
-                return;
-            }
-
-            if (isTableExternal) { // nuke the dirs if an external table
-                final Path path = new Path(partition.getLocation());
-                if (!HadoopClientFactory.get().createProxiedFileSystem(path.toUri()).delete(path, true)) {
-                    throw new FalconException("Failed to delete location " + path + " for partition "
-                            + partition.getValues());
-                }
-            }
-
-            // replace ',' with ';' since message producer splits instancePaths string by ','
-            String partitionInfo = partition.getValues().toString().replace(",", ";");
-            LOG.info("Deleted partition: " + partitionInfo);
-            instanceDates.append(partitionInfo).append(',');
-            instancePaths.append(partition.getLocation()).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "CatalogStorage{"
-                + "catalogUrl='" + catalogUrl + '\''
-                + ", database='" + database + '\''
-                + ", table='" + table + '\''
-                + ", partitions=" + partitions
-                + '}';
-    }
-}


Mime
View raw message