falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [37/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom
Date Tue, 01 Mar 2016 08:26:23 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
deleted file mode 100644
index b709857..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.Vertex;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URISyntaxException;
-import java.util.Date;
-import java.util.TimeZone;
-
-/**
- * Instance Metadata relationship mapping helper.
- */
-public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
-
-    private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed
-    private static final String NONE = "NONE";
-    private static final String IGNORE = "IGNORE";
-
-    // process workflow properties from message
-    private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {
-        WorkflowExecutionArgs.USER_WORKFLOW_NAME,
-        WorkflowExecutionArgs.USER_WORKFLOW_ENGINE,
-        WorkflowExecutionArgs.WORKFLOW_ID,
-        WorkflowExecutionArgs.RUN_ID,
-        WorkflowExecutionArgs.STATUS,
-        WorkflowExecutionArgs.WF_ENGINE_URL,
-        WorkflowExecutionArgs.USER_SUBFLOW_ID,
-    };
-
-
-    public InstanceRelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
-        super(graph, preserveHistory);
-    }
-
-    public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException {
-        String processInstanceName = getProcessInstanceName(context);
-        LOG.info("Adding process instance: {}", processInstanceName);
-
-        Vertex processInstance = addVertex(processInstanceName,
-                RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsLong());
-        addWorkflowInstanceProperties(processInstance, context);
-
-        addInstanceToEntity(processInstance, context.getEntityName(),
-                RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
-        addInstanceToEntity(processInstance, context.getClusterName(),
-                RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE);
-        addInstanceToEntity(processInstance, context.getWorkflowUser(),
-                RelationshipType.USER, RelationshipLabel.USER);
-
-        if (isPreserveHistory()) {
-            Process process = ConfigurationStore.get().get(EntityType.PROCESS, context.getEntityName());
-            addDataClassification(process.getTags(), processInstance);
-            addPipelines(process.getPipelines(), processInstance);
-        }
-
-        addCounters(processInstance, context);
-
-        return processInstance;
-    }
-
-    private void addCounters(Vertex processInstance, WorkflowExecutionContext context) throws FalconException {
-        String counterString = getCounterString(context);
-        if (!StringUtils.isBlank(counterString)) {
-            addCountersToInstance(counterString, processInstance);
-        }
-    }
-
-    private String getCounterString(WorkflowExecutionContext context) {
-        if (!StringUtils.isBlank(context.getCounters())) {
-            return context.getCounters();
-        }
-        return null;
-    }
-
-    public String getProcessInstanceName(WorkflowExecutionContext context) {
-        return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
-    }
-
-    public void addWorkflowInstanceProperties(Vertex processInstance,
-                                              WorkflowExecutionContext context) {
-        for (WorkflowExecutionArgs instanceWorkflowProperty : INSTANCE_WORKFLOW_PROPERTIES) {
-            addProperty(processInstance, context, instanceWorkflowProperty);
-        }
-
-        processInstance.setProperty(RelationshipProperty.VERSION.getName(),
-                context.getUserWorkflowVersion());
-    }
-
-    private void addProperty(Vertex vertex, WorkflowExecutionContext context,
-                             WorkflowExecutionArgs optionName) {
-        String value = context.getValue(optionName);
-        if (value == null || value.length() == 0) {
-            return;
-        }
-
-        vertex.setProperty(optionName.getName(), value);
-    }
-
-    private void addCountersToInstance(String counterString, Vertex vertex) throws FalconException {
-        String[] counterKeyValues = counterString.split(",");
-        try {
-            for (String counter : counterKeyValues) {
-                String[] keyVals = counter.split(":", 2);
-                vertex.setProperty(keyVals[0], Long.parseLong(keyVals[1]));
-            }
-        } catch (NumberFormatException e) {
-            throw new FalconException("Invalid values for counter:" + e);
-        }
-    }
-
-    public void addInstanceToEntity(Vertex instanceVertex, String entityName,
-                                    RelationshipType entityType, RelationshipLabel edgeLabel) {
-        addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null);
-    }
-
-    public void addInstanceToEntity(Vertex instanceVertex, String entityName,
-                                    RelationshipType entityType, RelationshipLabel edgeLabel,
-                                    String timestamp) {
-        Vertex entityVertex = findVertex(entityName, entityType);
-        LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex);
-        if (entityVertex == null) {
-            LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName);
-            throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
-        }
-
-        addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp);
-    }
-
-    public void addOutputFeedInstances(WorkflowExecutionContext context,
-                                       Vertex processInstance) throws FalconException {
-        String outputFeedNamesArg = context.getOutputFeedNames();
-        if (NONE.equals(outputFeedNamesArg) || IGNORE.equals(outputFeedNamesArg)) {
-            return; // there are no output feeds for this process
-        }
-
-        String[] outputFeedNames = context.getOutputFeedNamesList();
-        String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
-
-        for (int index = 0; index < outputFeedNames.length; index++) {
-            String feedName = outputFeedNames[index];
-            String feedInstanceDataPath = outputFeedInstancePaths[index];
-            addFeedInstance(processInstance, RelationshipLabel.PROCESS_FEED_EDGE,
-                    context, feedName, feedInstanceDataPath);
-        }
-    }
-
-    public void addInputFeedInstances(WorkflowExecutionContext context,
-                                      Vertex processInstance) throws FalconException {
-        String inputFeedNamesArg = context.getInputFeedNames();
-        if (NONE.equals(inputFeedNamesArg) || IGNORE.equals(inputFeedNamesArg)) {
-            return; // there are no input feeds for this process
-        }
-
-        String[] inputFeedNames = context.getInputFeedNamesList();
-        String[] inputFeedInstancePaths = context.getInputFeedInstancePathsList();
-
-        for (int index = 0; index < inputFeedNames.length; index++) {
-            String inputFeedName = inputFeedNames[index];
-            String inputFeedInstancePath = inputFeedInstancePaths[index];
-            // Multiple instance paths for a given feed is separated by ","
-            String[] feedInstancePaths = inputFeedInstancePath.split(",");
-
-            for (String feedInstanceDataPath : feedInstancePaths) {
-                addFeedInstance(processInstance, RelationshipLabel.FEED_PROCESS_EDGE,
-                        context, inputFeedName, feedInstanceDataPath);
-            }
-        }
-    }
-
-    public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException {
-        // For replication there will be only one output feed name and path
-        String feedName = context.getOutputFeedNames();
-        String feedInstanceDataPath = context.getOutputFeedInstancePaths();
-        String targetClusterName = context.getClusterName();
-
-        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
-                feedInstanceDataPath, targetClusterName);
-        String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
-                feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
-
-        LOG.info("Vertex exists? name={}, type={}, v={}",
-                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
-        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
-            LOG.info("{} instance vertex {} does not exist, add it",
-                    RelationshipType.FEED_INSTANCE, feedInstanceName);
-            feedInstanceVertex = addFeedInstance(// add a new instance
-                    feedInstanceName, context, feedName, context.getSrcClusterName());
-        }
-
-        addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
-                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
-
-        addCounters(feedInstanceVertex, context);
-    }
-
-    public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
-        final String outputFeedPaths = context.getOutputFeedInstancePaths();
-        if (IGNORE.equals(outputFeedPaths)) {
-            LOG.info("There were no evicted instances, nothing to record");
-            return;
-        }
-
-        LOG.info("Recording lineage for evicted instances {}", outputFeedPaths);
-        // For retention there will be only one output feed name
-        String feedName = context.getOutputFeedNames();
-        String[] evictedFeedInstancePathList = context.getOutputFeedInstancePathsList();
-        String clusterName = context.getClusterName();
-
-        for (String evictedFeedInstancePath : evictedFeedInstancePathList) {
-            LOG.info("Computing feed instance for : name= {}, path={}, in cluster: {}",
-                    feedName, evictedFeedInstancePath, clusterName);
-            String feedInstanceName = getFeedInstanceName(feedName, clusterName,
-                    evictedFeedInstancePath, context.getNominalTimeAsISO8601());
-            Vertex feedInstanceVertex = findVertex(feedInstanceName,
-                    RelationshipType.FEED_INSTANCE);
-
-            LOG.info("Vertex exists? name={}, type={}, v={}",
-                    feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
-            if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
-                LOG.info("{} instance vertex {} does not exist, add it",
-                        RelationshipType.FEED_INSTANCE, feedInstanceName);
-                feedInstanceVertex = addFeedInstance(// add a new instance
-                        feedInstanceName, context, feedName, clusterName);
-            }
-
-            addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
-                    RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, context.getTimeStampAsISO8601());
-        }
-    }
-
-
-    public void addImportedInstance(WorkflowExecutionContext context) throws FalconException {
-
-        String feedName = context.getOutputFeedNames();
-        String feedInstanceDataPath = context.getOutputFeedInstancePaths();
-        String datasourceName = context.getDatasourceName();
-        String sourceClusterName = context.getSrcClusterName();
-
-        LOG.info("Computing import feed instance for : name= {} path= {}, in cluster: {} "
-                       +  "from datasource: {}", feedName,
-                feedInstanceDataPath, sourceClusterName, datasourceName);
-        String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName,
-                feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
-
-        LOG.info("Vertex exists? name={}, type={}, v={}",
-                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
-        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
-            LOG.info("{} instance vertex {} does not exist, add it",
-                    RelationshipType.FEED_INSTANCE, feedInstanceName);
-            feedInstanceVertex = addFeedInstance(// add a new instance
-                    feedInstanceName, context, feedName, context.getSrcClusterName());
-        }
-        addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY,
-                RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601());
-        addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY,
-                RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601());
-    }
-
-    public String getImportInstanceName(WorkflowExecutionContext context) {
-        return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
-    }
-
-    private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
-                                 WorkflowExecutionContext context, String feedName,
-                                 String feedInstanceDataPath) throws FalconException {
-        String clusterName = context.getClusterName();
-        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
-                feedInstanceDataPath, clusterName);
-        String feedInstanceName = getFeedInstanceName(feedName, clusterName,
-                feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName);
-        addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
-    }
-
-    private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context,
-                                   String feedName, String clusterName) throws FalconException {
-        LOG.info("Adding feed instance {}", feedInstanceName);
-        Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
-                context.getTimeStampAsLong());
-
-        addInstanceToEntity(feedInstance, feedName,
-                RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
-        addInstanceToEntity(feedInstance, clusterName,
-                RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
-        addInstanceToEntity(feedInstance, context.getWorkflowUser(),
-                RelationshipType.USER, RelationshipLabel.USER);
-
-        if (isPreserveHistory()) {
-            Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
-            addDataClassification(feed.getTags(), feedInstance);
-            addGroups(feed.getGroups(), feedInstance);
-        }
-
-        return feedInstance;
-    }
-
-    public static String getFeedInstanceName(String feedName, String clusterName,
-                                             String feedInstancePath,
-                                             String nominalTime) throws FalconException {
-        try {
-            Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
-            Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
-
-            Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
-            return storageType == Storage.TYPE.TABLE
-                    ? getTableFeedInstanceName(feed, feedInstancePath, storageType)
-                    : getFileSystemFeedInstanceName(feedInstancePath, feed, cluster, nominalTime);
-
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
-        }
-    }
-
-    private static String getTableFeedInstanceName(Feed feed, String feedInstancePath,
-                                            Storage.TYPE storageType) throws URISyntaxException {
-        CatalogStorage instanceStorage = (CatalogStorage) FeedHelper.createStorage(
-                storageType.name(), feedInstancePath);
-        return feed.getName() + "/" + instanceStorage.toPartitionAsPath();
-    }
-
-    private static String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed,
-                                                        Cluster cluster,
-                                                        String nominalTime) throws FalconException {
-        Storage rawStorage = FeedHelper.createStorage(cluster, feed);
-        String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA);
-        String instance = feedInstancePath;
-
-        String[] elements = FeedDataPath.PATTERN.split(feedPathTemplate);
-        for (String element : elements) {
-            instance = instance.replaceFirst(element, "");
-        }
-
-        Date instanceTime = FeedHelper.getDate(feedPathTemplate,
-                new Path(feedInstancePath), TimeZone.getTimeZone("UTC"));
-
-        return StringUtils.isEmpty(instance)
-                ? feed.getName() + "/" + nominalTime
-                : feed.getName() + "/"
-                        + SchemaHelper.formatDateUTC(instanceTime);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
deleted file mode 100644
index cf2b651..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.GraphFactory;
-import com.tinkerpop.blueprints.KeyIndexableGraph;
-import com.tinkerpop.blueprints.TransactionalGraph;
-import com.tinkerpop.blueprints.Vertex;
-import com.tinkerpop.blueprints.util.TransactionRetryHelper;
-import com.tinkerpop.blueprints.util.TransactionWork;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.service.Services;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.falcon.workflow.WorkflowExecutionListener;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Metadata relationship mapping service. Maps relationships into a graph database.
- */
-public class MetadataMappingService
-        implements FalconService, ConfigurationChangeListener, WorkflowExecutionListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MetadataMappingService.class);
-
-    /**
-     * Constance for the service name.
-     */
-    public static final String SERVICE_NAME = MetadataMappingService.class.getSimpleName();
-
-    /**
-     * Constant for the configuration property that indicates the prefix.
-     */
-    private static final String FALCON_PREFIX = "falcon.graph.";
-
-
-    private Graph graph;
-    private Set<String> vertexIndexedKeys;
-    private Set<String> edgeIndexedKeys;
-    private EntityRelationshipGraphBuilder entityGraphBuilder;
-    private InstanceRelationshipGraphBuilder instanceGraphBuilder;
-
-    private int transactionRetries;
-    private long transactionRetryDelayInMillis;
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    @Override
-    public void init() throws FalconException {
-        graph = initializeGraphDB();
-        createIndicesForVertexKeys();
-        // todo - create Edge Cardinality Constraints
-        LOG.info("Initialized graph db: {}", graph);
-
-        vertexIndexedKeys = getIndexableGraph().getIndexedKeys(Vertex.class);
-        LOG.info("Init vertex property keys: {}", vertexIndexedKeys);
-
-        edgeIndexedKeys = getIndexableGraph().getIndexedKeys(Edge.class);
-        LOG.info("Init edge property keys: {}", edgeIndexedKeys);
-
-        boolean preserveHistory = Boolean.valueOf(StartupProperties.get().getProperty(
-                "falcon.graph.preserve.history", "false"));
-        entityGraphBuilder = new EntityRelationshipGraphBuilder(graph, preserveHistory);
-        instanceGraphBuilder = new InstanceRelationshipGraphBuilder(graph, preserveHistory);
-
-        ConfigurationStore.get().registerListener(this);
-        Services.get().<WorkflowJobEndNotificationService>getService(
-                WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
-        try {
-            transactionRetries = Integer.parseInt(StartupProperties.get().getProperty(
-                    "falcon.graph.transaction.retry.count", "3"));
-            transactionRetryDelayInMillis = Long.parseLong(StartupProperties.get().getProperty(
-                    "falcon.graph.transaction.retry.delay", "5"));
-        } catch (NumberFormatException e) {
-            throw new FalconException("Invalid values for graph transaction retry delay/count " + e);
-        }
-    }
-
-    protected Graph initializeGraphDB() {
-        LOG.info("Initializing graph db");
-
-        Configuration graphConfig = getConfiguration();
-        return GraphFactory.open(graphConfig);
-    }
-
-    public static Configuration getConfiguration() {
-        Configuration graphConfig = new BaseConfiguration();
-
-        Properties configProperties = StartupProperties.get();
-        for (Map.Entry entry : configProperties.entrySet()) {
-            String name = (String) entry.getKey();
-            if (name.startsWith(FALCON_PREFIX)) {
-                String value = (String) entry.getValue();
-                name = name.substring(FALCON_PREFIX.length());
-                graphConfig.setProperty(name, value);
-            }
-        }
-
-        return graphConfig;
-    }
-
-    /**
-     * This unfortunately requires a handle to Titan implementation since
-     * com.tinkerpop.blueprints.KeyIndexableGraph#createKeyIndex does not create an index.
-     */
-    protected void createIndicesForVertexKeys() {
-        if (!((KeyIndexableGraph) graph).getIndexedKeys(Vertex.class).isEmpty()) {
-            LOG.info("Indexes already exist for graph");
-            return;
-        }
-
-        LOG.info("Indexes does not exist, Creating indexes for graph");
-        // todo - externalize this
-        makeNameKeyIndex();
-        makeKeyIndex(RelationshipProperty.TYPE.getName());
-        makeKeyIndex(RelationshipProperty.TIMESTAMP.getName());
-        makeKeyIndex(RelationshipProperty.VERSION.getName());
-    }
-
-    private void makeNameKeyIndex() {
-        getTitanGraph().makeKey(RelationshipProperty.NAME.getName())
-                .dataType(String.class)
-                .indexed(Vertex.class)
-                .indexed(Edge.class)
-                // .unique() todo this ought to be unique?
-                .make();
-        getTitanGraph().commit();
-    }
-
-    private void makeKeyIndex(String key) {
-        getTitanGraph().makeKey(key)
-                .dataType(String.class)
-                .indexed(Vertex.class)
-                .make();
-        getTitanGraph().commit();
-    }
-
-    public Graph getGraph() {
-        return graph;
-    }
-
-    public KeyIndexableGraph getIndexableGraph() {
-        return (KeyIndexableGraph) graph;
-    }
-
-    public TransactionalGraph getTransactionalGraph() {
-        return (TransactionalGraph) graph;
-    }
-
-    public TitanBlueprintsGraph getTitanGraph() {
-        return (TitanBlueprintsGraph) graph;
-    }
-
-    public Set<String> getVertexIndexedKeys() {
-        return vertexIndexedKeys;
-    }
-
-    public Set<String> getEdgeIndexedKeys() {
-        return edgeIndexedKeys;
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-        Services.get().<WorkflowJobEndNotificationService>getService(
-                WorkflowJobEndNotificationService.SERVICE_NAME).unregisterListener(this);
-
-        LOG.info("Shutting down graph db");
-        graph.shutdown();
-    }
-
-    @Override
-    public void onAdd(final Entity entity) throws FalconException {
-        EntityType entityType = entity.getEntityType();
-        LOG.info("Adding lineage for entity: {}, type: {}", entity.getName(), entityType);
-        try {
-            new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
-                    .perform(new TransactionWork<Void>() {
-                        @Override
-                        public Void execute(TransactionalGraph transactionalGraph) throws Exception {
-                            entityGraphBuilder.addEntity(entity);
-                            transactionalGraph.commit();
-                            return null;
-                        }
-                    }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
-
-        } catch (Exception e) {
-            getTransactionalGraph().rollback();
-            throw new FalconException(e);
-        }
-    }
-
-    @Override
-    public void onRemove(Entity entity) throws FalconException {
-        // do nothing, we'd leave the deleted entities as-is for historical purposes
-        // should we mark 'em as deleted?
-    }
-
-    @Override
-    public void onChange(final Entity oldEntity, final Entity newEntity) throws FalconException {
-        EntityType entityType = newEntity.getEntityType();
-        LOG.info("Updating lineage for entity: {}, type: {}", newEntity.getName(), entityType);
-        try {
-            new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
-                    .perform(new TransactionWork<Void>() {
-                        @Override
-                        public Void execute(TransactionalGraph transactionalGraph) throws Exception {
-                            entityGraphBuilder.updateEntity(oldEntity, newEntity);
-                            transactionalGraph.commit();
-                            return null;
-                        }
-                    }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
-
-        } catch (Exception e) {
-            getTransactionalGraph().rollback();
-            throw new FalconException(e);
-        }
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
-    }
-
-    @Override
-    public void onSuccess(final WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding lineage for context {}", context);
-        try {
-            new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
-                    .perform(new TransactionWork<Void>() {
-                        @Override
-                        public Void execute(TransactionalGraph transactionalGraph) throws Exception {
-                            onSuccessfulExecution(context);
-                            transactionalGraph.commit();
-                            return null;
-                        }
-                    }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
-        } catch (Exception e) {
-            getTransactionalGraph().rollback();
-            throw new FalconException(e);
-        }
-    }
-
-    private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException {
-        WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
-        switch (entityOperation) {
-        case GENERATE:
-            onProcessInstanceExecuted(context);
-            break;
-        case REPLICATE:
-            onFeedInstanceReplicated(context);
-            break;
-        case DELETE:
-            onFeedInstanceEvicted(context);
-            break;
-        case IMPORT:
-            onFeedInstanceImported(context);
-            break;
-        default:
-            throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation);
-        }
-    }
-
-    @Override
-    public void onFailure(WorkflowExecutionContext context) throws FalconException {
-        // do nothing since lineage is only recorded for successful workflow
-    }
-
-    @Override
-    public void onStart(WorkflowExecutionContext context) throws FalconException {
-        // Do nothing
-    }
-
-    @Override
-    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
-        // Do nothing
-    }
-
-    @Override
-    public void onWait(WorkflowExecutionContext context) throws FalconException {
-        // TBD
-    }
-
-
-    private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
-        Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
-        instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
-        instanceGraphBuilder.addInputFeedInstances(context, processInstance);
-    }
-
-    private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601());
-        instanceGraphBuilder.addReplicatedInstance(context);
-    }
-
-    private void onFeedInstanceEvicted(WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
-        instanceGraphBuilder.addEvictedInstance(context);
-    }
-    private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601());
-        instanceGraphBuilder.addImportedInstance(context);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
deleted file mode 100644
index 0c3fcee..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.GraphQuery;
-import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.Iterator;
-
-/**
- * Base class for Metadata relationship mapping helper.
- */
-public abstract class RelationshipGraphBuilder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RelationshipGraphBuilder.class);
-
-    /**
-     * A blueprints graph.
-     */
-    private final Graph graph;
-
-    /**
-     * If enabled, preserves history of tags and groups for instances else will only
-     * be available for entities.
-     */
-    private final boolean preserveHistory;
-
-    protected RelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
-        this.graph = graph;
-        this.preserveHistory = preserveHistory;
-    }
-
-    public Graph getGraph() {
-        return graph;
-    }
-
-    protected boolean isPreserveHistory() {
-        return preserveHistory;
-    }
-
-    public Vertex addVertex(String name, RelationshipType type) {
-        Vertex vertex = findVertex(name, type);
-        if (vertex != null) {
-            LOG.debug("Found an existing vertex for: name={}, type={}", name, type);
-            return vertex;
-        }
-
-        return createVertex(name, type);
-    }
-
-    protected Vertex addVertex(String name, RelationshipType type, long timestamp) {
-        Vertex vertex = findVertex(name, type);
-        if (vertex != null) {
-            LOG.debug("Found an existing vertex for: name={}, type={}", name, type);
-            return vertex;
-        }
-
-        return createVertex(name, type, timestamp);
-    }
-
-    protected Vertex findVertex(String name, RelationshipType type) {
-        LOG.debug("Finding vertex for: name={}, type={}", name, type);
-
-        GraphQuery query = graph.query()
-                .has(RelationshipProperty.NAME.getName(), name)
-                .has(RelationshipProperty.TYPE.getName(), type.getName());
-        Iterator<Vertex> results = query.vertices().iterator();
-        return results.hasNext() ? results.next() : null;  // returning one since name is unique
-    }
-
-    protected Vertex createVertex(String name, RelationshipType type) {
-        return createVertex(name, type, System.currentTimeMillis());
-    }
-
-    protected Vertex createVertex(String name, RelationshipType type, long timestamp) {
-        LOG.debug("Creating a new vertex for: name={}, type={}", name, type);
-
-        Vertex vertex = graph.addVertex(null);
-        vertex.setProperty(RelationshipProperty.NAME.getName(), name);
-        vertex.setProperty(RelationshipProperty.TYPE.getName(), type.getName());
-        vertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
-
-        return vertex;
-    }
-
-    protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
-        return addEdge(fromVertex, toVertex, edgeLabel, null);
-    }
-
-    protected Edge addEdge(Vertex fromVertex, Vertex toVertex,
-                           String edgeLabel, String timestamp) {
-        Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
-
-        Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
-        if (timestamp != null) {
-            edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
-        }
-
-        return edgeToVertex;
-    }
-
-    protected void removeEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
-        Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
-        if (edge != null) {
-            getGraph().removeEdge(edge);
-        }
-    }
-
-    protected void removeEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) {
-        Edge edge = findEdge(fromVertex, toVertexName, edgeLabel);
-        if (edge != null) {
-            getGraph().removeEdge(edge);
-        }
-    }
-
-    protected Edge findEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) {
-        return findEdge(fromVertex, toVertex.getProperty(RelationshipProperty.NAME.getName()), edgeLabel);
-    }
-
-    protected Edge findEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) {
-        Edge edgeToFind = null;
-        for (Edge edge : fromVertex.getEdges(Direction.OUT, edgeLabel)) {
-            if (edge.getVertex(Direction.IN).getProperty(RelationshipProperty.NAME.getName()).equals(toVertexName)) {
-                edgeToFind = edge;
-                break;
-            }
-        }
-
-        return edgeToFind;
-    }
-
-    protected void addUserRelation(Vertex fromVertex) {
-        addUserRelation(fromVertex, RelationshipLabel.USER.getName());
-    }
-
-    protected void addUserRelation(Vertex fromVertex, String edgeLabel) {
-        Vertex relationToUserVertex = addVertex(CurrentUser.getUser(), RelationshipType.USER);
-        addEdge(fromVertex, relationToUserVertex, edgeLabel);
-    }
-
-    protected void addDataClassification(String classification, Vertex entityVertex) {
-        if (classification == null || classification.length() == 0) {
-            return;
-        }
-
-        String[] tags = classification.split(",");
-        for (String tag : tags) {
-            int index = tag.indexOf("=");
-            String tagKey = tag.substring(0, index);
-            String tagValue = tag.substring(index + 1, tag.length());
-
-            Vertex tagValueVertex = addVertex(tagValue, RelationshipType.TAGS);
-            addEdge(entityVertex, tagValueVertex, tagKey);
-        }
-    }
-
-    protected void addGroups(String groups, Vertex fromVertex) {
-        addCSVTags(groups, fromVertex, RelationshipType.GROUPS, RelationshipLabel.GROUPS);
-    }
-
-    protected void addPipelines(String pipelines, Vertex fromVertex) {
-        addCSVTags(pipelines, fromVertex, RelationshipType.PIPELINES, RelationshipLabel.PIPELINES);
-    }
-
-    protected void addProcessFeedEdge(Vertex processVertex, Vertex feedVertex,
-                                      RelationshipLabel edgeLabel) {
-        if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) {
-            addEdge(feedVertex, processVertex, edgeLabel.getName());
-        } else {
-            addEdge(processVertex, feedVertex, edgeLabel.getName());
-        }
-    }
-
-    protected String getCurrentTimeStamp() {
-        return SchemaHelper.formatDateUTC(new Date());
-    }
-
-    /**
-     * Adds comma separated values as tags.
-     *
-     * @param csvTags           comma separated values.
-     * @param fromVertex        from vertex.
-     * @param relationshipType  vertex type.
-     * @param edgeLabel         edge label.
-     */
-    private void addCSVTags(String csvTags, Vertex fromVertex,
-                            RelationshipType relationshipType, RelationshipLabel edgeLabel) {
-        if (StringUtils.isEmpty(csvTags)) {
-            return;
-        }
-
-        String[] tags = csvTags.split(",");
-        for (String tag : tags) {
-            Vertex vertex = addVertex(tag, relationshipType);
-            addEdge(fromVertex, vertex, edgeLabel.getName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
deleted file mode 100644
index 6d4bf46..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-/**
- * Enumerates Relationship edge labels.
- */
-public enum RelationshipLabel {
-
-    // entity edge labels
-    FEED_CLUSTER_EDGE("stored-in"),
-    PROCESS_CLUSTER_EDGE("runs-on"),
-    FEED_PROCESS_EDGE("input"),
-    PROCESS_FEED_EDGE("output"),
-    DATASOURCE_IMPORT_EDGE("import"),
-
-    // instance edge labels
-    INSTANCE_ENTITY_EDGE("instance-of"),
-
-    // edge labels
-    CLUSTER_COLO("collocated"),
-    USER("owned-by"),
-    GROUPS("grouped-as"),
-    PIPELINES("pipeline"),
-
-    // replication labels
-    FEED_CLUSTER_REPLICATED_EDGE("replicated-to"),
-
-    // eviction labels
-    FEED_CLUSTER_EVICTED_EDGE("evicted-from");
-
-    private final String name;
-
-    RelationshipLabel(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
deleted file mode 100644
index ff437d9..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-/**
- * Enumerates Relationship property keys.
- */
-public enum RelationshipProperty {
-
-    // vertex property keys - indexed
-    NAME("name"),
-    TYPE("type"),
-    TIMESTAMP("timestamp"),
-    VERSION("version"),
-
-    // workflow properties
-    USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
-    USER_WORKFLOW_NAME("userWorkflowName", "user workflow name"),
-    USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version"),
-
-    // workflow instance properties
-    WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
-    RUN_ID("runId", "current run-id of the instance"),
-    STATUS("status", "status of the user workflow instance"),
-    WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex: oozie"),
-    USER_SUBFLOW_ID("subflowId", "external id of user workflow");
-
-
-    private final String name;
-    private final String description;
-
-    RelationshipProperty(String name) {
-        this(name, name);
-    }
-
-    RelationshipProperty(String name, String description) {
-        this.name = name;
-        this.description = description;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
deleted file mode 100644
index b4d46c4..0000000
--- a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.retention;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Utility class for serializing and deserializing the evicted instance paths.
- */
-
-public final class EvictedInstanceSerDe {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EvictedInstanceSerDe.class);
-
-    public static final String INSTANCEPATH_PREFIX = "instancePaths=";
-    public static final String INSTANCEPATH_SEPARATOR = ",";
-
-
-    private EvictedInstanceSerDe() {}
-
-    /**
-     * This method serializes the evicted instances to a file in logs dir for a given feed.
-     * @see org.apache.falcon.retention.FeedEvictor
-     *
-     * *Note:* This is executed with in the map task for evictor action
-     *
-     * @param fileSystem file system handle
-     * @param logFilePath       File path to serialize the instances to
-     * @param instances  list of instances, comma separated
-     * @throws IOException
-     */
-    public static void serializeEvictedInstancePaths(final FileSystem fileSystem,
-                                                     final Path logFilePath,
-                                                     StringBuffer instances) throws IOException {
-        LOG.info("Writing deleted instances {} to path {}", instances, logFilePath);
-        OutputStream out = null;
-        try {
-            out = fileSystem.create(logFilePath);
-            instances.insert(0, INSTANCEPATH_PREFIX); // add the prefix
-            out.write(instances.toString().getBytes());
-
-            // To make sure log cleaning service can delete this file
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fileSystem.setPermission(logFilePath, permission);
-        } finally {
-            if (out != null) {
-                out.close();
-            }
-        }
-
-        if (LOG.isDebugEnabled()) {
-            logEvictedInstancePaths(fileSystem, logFilePath);
-        }
-    }
-
-    private static void logEvictedInstancePaths(final FileSystem fs,
-                                                final Path outPath) throws IOException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(outPath);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        LOG.debug("Instance Paths copied to {}", outPath);
-        LOG.debug("Written {}", writer);
-    }
-
-    /**
-     * This method deserializes the evicted instances from a log file on hdfs.
-     * @see org.apache.falcon.messaging.JMSMessageProducer
-     * *Note:* This is executed with in the falcon server
-     *
-     * @param fileSystem file system handle
-     * @param logFile    File path to serialize the instances to
-     * @return list of instances, comma separated
-     * @throws IOException
-     */
-    public static String[] deserializeEvictedInstancePaths(final FileSystem fileSystem,
-                                                           final Path logFile) throws IOException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fileSystem.open(logFile);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        String[] instancePaths = writer.toString().split(INSTANCEPATH_PREFIX);
-
-        if (instancePaths.length <= 1) {
-            LOG.info("Returning 0 instance paths for feed ");
-            return new String[0];
-        } else {
-            LOG.info("Returning instance paths for feed {}", instancePaths[1]);
-            return instancePaths[1].split(INSTANCEPATH_SEPARATOR);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
deleted file mode 100644
index 1457b06..0000000
--- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.retention;
-
-import org.apache.commons.el.ExpressionEvaluatorImpl;
-import org.apache.falcon.Pair;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-import java.util.Date;
-
-/**
- * Utilities for feed eviction.
- */
-public final class EvictionHelper {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
-
-    private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
-    private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
-
-    private EvictionHelper(){}
-
-    public static Pair<Date, Date> getDateRange(String period) throws ELException {
-        Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
-                Long.class, RESOLVER, RESOLVER);
-        Date end = new Date();
-        Date start = new Date(end.getTime() - duration);
-        return Pair.of(start, end);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java b/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
deleted file mode 100644
index f7b2155..0000000
--- a/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang.Validate;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Date;
-import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
-
-
-/**
- * Authentication Service at startup that initializes the authentication credentials
- * based on authentication type. If Kerberos is enabled, it logs in the user with the key tab.
- */
-public class AuthenticationInitializationService implements FalconService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AuthenticationInitializationService.class);
-
-    /**
-     * Constant for the configuration property that indicates the prefix.
-     */
-    protected static final String CONFIG_PREFIX = "falcon.service.authentication.";
-
-    /**
-     * Constant for the configuration property that indicates the keytab file path.
-     */
-    protected static final String KERBEROS_KEYTAB = CONFIG_PREFIX + KerberosAuthenticationHandler.KEYTAB;
-
-    /**
-     * Constant for the configuration property that indicates the kerberos principal.
-     */
-    protected static final String KERBEROS_PRINCIPAL = CONFIG_PREFIX + KerberosAuthenticationHandler.PRINCIPAL;
-
-    /**
-     * Constant for the configuration property that indicates the authentication token validity time in seconds.
-     */
-    protected static final String AUTH_TOKEN_VALIDITY_SECONDS = CONFIG_PREFIX + "token.validity";
-
-    private Timer timer = new Timer();
-    private static final String SERVICE_NAME = "Authentication initialization service";
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    @Override
-    public void init() throws FalconException {
-
-        if (SecurityUtil.isSecurityEnabled()) {
-            LOG.info("Falcon Kerberos Authentication Enabled!");
-            initializeKerberos();
-
-            String authTokenValidity = StartupProperties.get().getProperty(AUTH_TOKEN_VALIDITY_SECONDS);
-            long validateFrequency;
-            try {
-                validateFrequency = (StringUtils.isNotEmpty(authTokenValidity))
-                        ? Long.parseLong(authTokenValidity) : 86400;
-            } catch (NumberFormatException nfe) {
-                throw new FalconException("Invalid value provided for startup property \""
-                        + AUTH_TOKEN_VALIDITY_SECONDS + "\", please provide a valid long number", nfe);
-            }
-            timer.schedule(new TokenValidationThread(), 0, validateFrequency*1000);
-        } else {
-            LOG.info("Falcon Simple Authentication Enabled!");
-            Configuration ugiConf = new Configuration();
-            ugiConf.set("hadoop.security.authentication", "simple");
-            UserGroupInformation.setConfiguration(ugiConf);
-        }
-    }
-
-    protected static void initializeKerberos() throws FalconException {
-        try {
-            Properties configuration = StartupProperties.get();
-            String principal = configuration.getProperty(KERBEROS_PRINCIPAL);
-            Validate.notEmpty(principal,
-                    "Missing required configuration property: " + KERBEROS_PRINCIPAL);
-            principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(
-                    principal, SecurityUtil.getLocalHostName());
-
-            String keytabFilePath = configuration.getProperty(KERBEROS_KEYTAB);
-            Validate.notEmpty(keytabFilePath,
-                    "Missing required configuration property: " + KERBEROS_KEYTAB);
-            checkIsReadable(keytabFilePath);
-
-            Configuration conf = new Configuration();
-            conf.set("hadoop.security.authentication", "kerberos");
-
-            UserGroupInformation.setConfiguration(conf);
-            UserGroupInformation.loginUserFromKeytab(principal, keytabFilePath);
-
-            LOG.info("Got Kerberos ticket, keytab: {}, Falcon principal: {}", keytabFilePath, principal);
-        } catch (Exception ex) {
-            throw new FalconException("Could not initialize " + SERVICE_NAME
-                    + ": " + ex.getMessage(), ex);
-        }
-    }
-
-    private static void checkIsReadable(String keytabFilePath) {
-        File keytabFile = new File(keytabFilePath);
-        if (!keytabFile.exists()) {
-            throw new IllegalArgumentException("The keytab file does not exist! " + keytabFilePath);
-        }
-
-        if (!keytabFile.isFile()) {
-            throw new IllegalArgumentException("The keytab file cannot be a directory! " + keytabFilePath);
-        }
-
-        if (!keytabFile.canRead()) {
-            throw new IllegalArgumentException("The keytab file is not readable! " + keytabFilePath);
-        }
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-        timer.cancel();
-    }
-
-    private static class TokenValidationThread extends TimerTask {
-        @Override
-        public void run() {
-            try {
-                LOG.info("Validating Auth Token: {}", new Date());
-                initializeKerberos();
-            } catch (Throwable t) {
-                LOG.error("Error in Auth Token Validation task: ", t);
-                GenericAlert.initializeKerberosFailed(
-                        "Exception in Auth Token Validation : ", t);
-            }
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
deleted file mode 100644
index a6f2564..0000000
--- a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.falcon.entity.EntityNotRegisteredException;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-
-import java.io.IOException;
-
-/**
- * An interface for authorizing user against an entity operation.
- */
-public interface AuthorizationProvider {
-
-    /**
-     * Check if the authenticated user is a super user.
-     *
-     * @param authenticatedUGI   proxy ugi for the authenticated user
-     * @return true if sure user, else false
-     */
-    boolean isSuperUser(UserGroupInformation authenticatedUGI);
-
-    /**
-     * Checks if authenticated user can proxy the entity acl owner.
-     *
-     * @param authenticatedUGI  proxy ugi for the authenticated user.
-     * @param aclOwner          entity ACL Owner.
-     * @param aclGroup          entity ACL group.
-     * @throws IOException
-     */
-    boolean shouldProxy(UserGroupInformation authenticatedUGI,
-                        String aclOwner, String aclGroup) throws IOException;
-
-    /**
-     * Determines if the authenticated user is authorized to execute the action on the resource,
-     * which is typically a REST resource path.
-     * Throws an exception if not authorized.
-     *
-     * @param resource   api resource, admin, entities or instance
-     * @param action     action being authorized on resource and entity if applicable
-     * @param entityType entity type in question, not for admin resource
-     * @param entityName entity name in question, not for admin resource
-     * @param authenticatedUGI   proxy ugi for the authenticated user
-     * @throws AuthorizationException
-     */
-    void authorizeResource(String resource,
-                           String action,
-                           String entityType,
-                           String entityName,
-                           UserGroupInformation authenticatedUGI)
-        throws AuthorizationException, EntityNotRegisteredException;
-
-    /**
-     * Determines if the authenticated user is authorized to execute the action on the entity.
-     * Throws an exception if not authorized.
-     *
-     * @param entityName entity in question, applicable for entities and instance resource
-     * @param entityType entity in question, applicable for entities and instance resource
-     * @param acl        entity ACL
-     * @param action     action being authorized on resource and entity if applicable
-     * @param authenticatedUGI   proxy ugi for the authenticated user
-     * @throws AuthorizationException
-     */
-    void authorizeEntity(String entityName, String entityType,
-                         AccessControlList acl, String action,
-                         UserGroupInformation authenticatedUGI) throws AuthorizationException;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java b/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java
deleted file mode 100644
index fc4f745..0000000
--- a/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-/**
- * Helper class for Hadoop credential provider functionality. Reflection to used to avoid
- * directly referencing the classes and methods so that version dependency is not introduced
- * as the Hadoop credential provider is only introduced in 2.6.0 and later.
- */
-
-public final class CredentialProviderHelper {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CredentialProviderHelper.class);
-
-    private static Class<?> clsCredProvider;
-    private static Class<?> clsCredProviderFactory;
-    private static Method methGetPassword;
-    private static Method methCreateCredEntry;
-    private static Method methFlush;
-    private static Method methGetProviders;
-
-    public static final String CREDENTIAL_PROVIDER_PATH = "hadoop.security.credential.provider.path";
-
-    static {
-        try {
-            LOG.debug("Reflecting credential provider classes and methods");
-            clsCredProvider = Class.forName("org.apache.hadoop.security.alias.CredentialProvider");
-            clsCredProviderFactory = Class.forName("org.apache.hadoop.security.alias.CredentialProviderFactory");
-            methCreateCredEntry = clsCredProvider.getMethod("createCredentialEntry", String.class, char[].class);
-            methFlush = clsCredProvider.getMethod("flush");
-            methGetPassword = Configuration.class.getMethod("getPassword", String.class);
-            methGetProviders = clsCredProviderFactory.getMethod("getProviders", new Class[] { Configuration.class });
-            LOG.debug("Found CredentialProviderFactory#getProviders");
-        } catch (ClassNotFoundException | NoSuchMethodException cnfe) {
-            LOG.debug("Ignoring exception", cnfe);
-        }
-    }
-
-    private CredentialProviderHelper() {
-
-    }
-
-    public static boolean isProviderAvailable() {
-        return !(clsCredProvider == null
-                || clsCredProviderFactory == null
-                || methCreateCredEntry == null
-                || methGetPassword == null
-                || methFlush == null);
-    }
-
-    public static String resolveAlias(Configuration conf, String alias) throws IOException {
-        try {
-            char[] cred = (char[]) methGetPassword.invoke(conf, alias);
-            if (cred == null) {
-                throw new IOException("The provided alias cannot be resolved");
-            }
-            return new String(cred);
-        } catch (InvocationTargetException ite) {
-            throw new RuntimeException("Error resolving password "
-                    + " from the credential providers ", ite.getTargetException());
-        } catch (IllegalAccessException iae) {
-            throw new RuntimeException("Error invoking the credential provider method", iae);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
deleted file mode 100644
index e7c1594..0000000
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.service.ProxyUserService;
-import org.apache.falcon.service.Services;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Current authenticated user via REST. Also captures the proxy user from authorized entity
- * and doles out proxied UserGroupInformation. Caches proxied users.
- */
-public final class CurrentUser {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CurrentUser.class);
-    private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
-
-    private final String authenticatedUser;
-    private String proxyUser;
-
-    private CurrentUser(String authenticatedUser) {
-        this.authenticatedUser = authenticatedUser;
-        this.proxyUser = authenticatedUser;
-    }
-
-    private static final ThreadLocal<CurrentUser> CURRENT_USER = new ThreadLocal<CurrentUser>();
-
-    /**
-     * Captures the authenticated user.
-     *
-     * @param user   authenticated user
-     */
-    public static void authenticate(final String user) {
-        if (StringUtils.isEmpty(user)) {
-            throw new IllegalStateException("Bad user name sent for authentication");
-        }
-
-        LOG.info("Logging in {}", user);
-        CurrentUser currentUser = new CurrentUser(user);
-        CURRENT_USER.set(currentUser);
-    }
-
-    /**
-     * Proxies doAs user.
-     *
-     * @param doAsUser doAs user
-     * @param proxyHost proxy host
-     * @throws IOException
-     */
-    public static void proxyDoAsUser(final String doAsUser,
-                                     final String proxyHost) throws IOException {
-        if (!isAuthenticated()) {
-            throw new IllegalStateException("Authentication not done");
-        }
-
-        String currentUser = CURRENT_USER.get().authenticatedUser;
-        if (StringUtils.isNotEmpty(doAsUser) && !doAsUser.equalsIgnoreCase(currentUser)) {
-            if (StringUtils.isEmpty(proxyHost)) {
-                throw new IllegalArgumentException("proxy host cannot be null or empty");
-            }
-            ProxyUserService proxyUserService = Services.get().getService(ProxyUserService.SERVICE_NAME);
-            try {
-                proxyUserService.validate(currentUser, proxyHost, doAsUser);
-            } catch (IOException ex) {
-                throw new RuntimeException(ex);
-            }
-
-            CurrentUser user = CURRENT_USER.get();
-            LOG.info("Authenticated user {} is proxying doAs user {} from host {}",
-                    user.authenticatedUser, doAsUser, proxyHost);
-            AUDIT.info("Authenticated user {} is proxying doAs user {} from host {}",
-                    user.authenticatedUser, doAsUser, proxyHost);
-            user.proxyUser = doAsUser;
-        }
-    }
-
-    /**
-     * Captures the entity owner if authenticated user is a super user.
-     *
-     * @param aclOwner entity acl owner
-     * @param aclGroup entity acl group
-     * @throws IOException
-     */
-    public static void proxy(final String aclOwner,
-                             final String aclGroup) throws IOException {
-        if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) {
-            throw new IllegalStateException("Authentication not done or Bad user name");
-        }
-
-        CurrentUser user = CURRENT_USER.get();
-        LOG.info("Authenticated user {} is proxying entity owner {}/{}",
-                user.authenticatedUser, aclOwner, aclGroup);
-        AUDIT.info("Authenticated user {} is proxying entity owner {}/{}",
-                user.authenticatedUser, aclOwner, aclGroup);
-        user.proxyUser = aclOwner;
-    }
-
-    /**
-     * Clears the context.
-     */
-    public static void clear() {
-        CURRENT_USER.remove();
-    }
-
-    /**
-     * Checks if the authenticate method is already called.
-     *
-     * @return true if authenticated user is set else false
-     */
-    public static boolean isAuthenticated() {
-        CurrentUser user = CURRENT_USER.get();
-        return user != null && user.authenticatedUser != null;
-    }
-
-    /**
-     * Returns authenticated user.
-     *
-     * @return logged in authenticated user.
-     */
-    public static String getAuthenticatedUser() {
-        CurrentUser user = CURRENT_USER.get();
-        if (user == null || user.authenticatedUser == null) {
-            throw new IllegalStateException("No user logged into the system");
-        } else {
-            return user.authenticatedUser;
-        }
-    }
-
-    /**
-     * Dole out a UGI object for the current authenticated user if authenticated
-     * else return current user.
-     *
-     * @return UGI object
-     * @throws java.io.IOException
-     */
-    public static UserGroupInformation getAuthenticatedUGI() throws IOException {
-        return CurrentUser.isAuthenticated()
-            ? createProxyUGI(getAuthenticatedUser()) : UserGroupInformation.getCurrentUser();
-    }
-
-    /**
-     * Returns the proxy user.
-     *
-     * @return proxy user
-     */
-    public static String getUser() {
-        CurrentUser user = CURRENT_USER.get();
-        if (user == null || user.proxyUser == null) {
-            throw new IllegalStateException("No user logged into the system");
-        } else {
-            return user.proxyUser;
-        }
-    }
-
-    private static ConcurrentMap<String, UserGroupInformation> userUgiMap =
-            new ConcurrentHashMap<String, UserGroupInformation>();
-
-    /**
-     * Create a proxy UGI object for the proxy user.
-     *
-     * @param proxyUser logged in user
-     * @return UGI object
-     * @throws IOException
-     */
-    public static UserGroupInformation createProxyUGI(String proxyUser) throws IOException {
-        UserGroupInformation proxyUgi = userUgiMap.get(proxyUser);
-        if (proxyUgi == null) {
-            // taking care of a race condition, the latest UGI will be discarded
-            proxyUgi = UserGroupInformation.createProxyUser(
-                    proxyUser, UserGroupInformation.getLoginUser());
-            userUgiMap.putIfAbsent(proxyUser, proxyUgi);
-        }
-
-        return proxyUgi;
-    }
-
-    /**
-     * Dole out a proxy UGI object for the current authenticated user if authenticated
-     * else return current user.
-     *
-     * @return UGI object
-     * @throws java.io.IOException
-     */
-    public static UserGroupInformation getProxyUGI() throws IOException {
-        return CurrentUser.isAuthenticated()
-            ? createProxyUGI(getUser()) : UserGroupInformation.getCurrentUser();
-    }
-
-    /**
-     * Gets a collection of group names the proxy user belongs to.
-     *
-     * @return group names
-     * @throws IOException
-     */
-    public static Set<String> getGroupNames() throws IOException {
-        HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUGI().getGroupNames()));
-        return Collections.unmodifiableSet(s);
-    }
-
-    /**
-     * Returns the primary group name for the proxy user.
-     *
-     * @return primary group name for the proxy user
-     */
-    public static String getPrimaryGroupName() {
-        try {
-            String[] groups = getProxyUGI().getGroupNames();
-            if (groups.length > 0) {
-                return groups[0];
-            }
-        } catch (IOException ignore) {
-            // ignored
-        }
-
-        return "unknown"; // this can only happen in tests
-    }
-}


Mime
View raw message