falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [28/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom
Date Tue, 01 Mar 2016 08:26:14 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
deleted file mode 100644
index 29f933d..0000000
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ /dev/null
@@ -1,1228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.GraphQuery;
-import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.retention.EvictedInstanceSerDe;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.service.Services;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
-import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
-
-/**
- * Test for Metadata relationship mapping service.
- */
-public class MetadataMappingServiceTest {
-
-    public static final String FALCON_USER = "falcon-user";
-    private static final String LOGS_DIR = "jail://global:00/falcon/staging/feed/logs";
-    private static final String NOMINAL_TIME = "2014-01-01-01-00";
-
-    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
-    public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster";
-    public static final String PROCESS_ENTITY_NAME = "sample-process";
-    public static final String COLO_NAME = "west-coast";
-    public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow";
-    public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow";
-    private static final String EVICTION_WORKFLOW_NAME = "eviction-policy-workflow";
-    public static final String WORKFLOW_VERSION = "1.0.9";
-
-    public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
-    public static final String INPUT_INSTANCE_PATHS =
-        "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
-                + "#jail://global:00/falcon/clicks-feed/2014-01-01";
-    public static final String INPUT_INSTANCE_PATHS_NO_DATE =
-            "jail://global:00/falcon/impression-feed,jail://global:00/falcon/impression-feed"
-                    + "#jail://global:00/falcon/clicks-feed";
-
-    public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
-    public static final String OUTPUT_INSTANCE_PATHS =
-        "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
-    private static final String REPLICATED_FEED = "raw-click";
-    private static final String EVICTED_FEED = "imp-click-join1";
-    private static final String EVICTED_INSTANCE_PATHS =
-            "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
-    public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
-            "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
-    public static final String COUNTERS = "TIMETAKEN:36956,COPY:30,BYTESCOPIED:1000";
-
-    public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
-
-    private ConfigurationStore configStore;
-    private MetadataMappingService service;
-
-    private Cluster clusterEntity;
-    private Cluster anotherCluster;
-    private List<Feed> inputFeeds = new ArrayList<>();
-    private List<Feed> outputFeeds = new ArrayList<>();
-    private Process processEntity;
-
-    @BeforeClass
-    public void setUp() throws Exception {
-        CurrentUser.authenticate(FALCON_USER);
-
-        configStore = ConfigurationStore.get();
-
-        Services.get().register(new WorkflowJobEndNotificationService());
-        StartupProperties.get().setProperty("falcon.graph.storage.directory",
-                "target/graphdb-" + System.currentTimeMillis());
-        StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
-        service = new MetadataMappingService();
-        service.init();
-
-        Set<String> vertexPropertyKeys = service.getVertexIndexedKeys();
-        System.out.println("Got vertex property keys: " + vertexPropertyKeys);
-
-        Set<String> edgePropertyKeys = service.getEdgeIndexedKeys();
-        System.out.println("Got edge property keys: " + edgePropertyKeys);
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        GraphUtils.dump(service.getGraph(), System.out);
-
-        cleanUp();
-        StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
-    }
-
-    @AfterMethod
-    public void printGraph() throws Exception {
-        GraphUtils.dump(service.getGraph());
-    }
-
-    private GraphQuery getQuery() {
-        return service.getGraph().query();
-    }
-
-    @Test
-    public void testGetName() throws Exception {
-        Assert.assertEquals(service.getName(), MetadataMappingService.SERVICE_NAME);
-    }
-
-    @Test
-    public void testOnAddClusterEntity() throws Exception {
-        // Get the before vertices and edges
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
-                "classification=production");
-
-        verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
-        verifyClusterEntityEdges();
-
-        // +4 = cluster, colo, tag, user
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4);
-        // +3 = cluster to colo, user and tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3);
-    }
-
-    @Test (dependsOnMethods = "testOnAddClusterEntity")
-    public void testOnAddFeedEntity() throws Exception {
-        // Get the before vertices and edges
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-
-        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
-                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
-                "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
-        inputFeeds.add(impressionsFeed);
-        verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
-        verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics");
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = feed, tag, group,
-        // user
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 4); // +4 = cluster, tag, group, user
-
-        // Get the before vertices and edges
-        beforeVerticesCount = getVerticesCount(service.getGraph());
-        beforeEdgesCount = getEdgesCount(service.getGraph());
-        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
-                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
-                "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
-        inputFeeds.add(clicksFeed);
-        verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); // feed and financial vertex
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user + 2Group
-        // + Tag
-
-        // Get the before vertices and edges
-        beforeVerticesCount = getVerticesCount(service.getGraph());
-        beforeEdgesCount = getEdgesCount(service.getGraph());
-        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
-                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        outputFeeds.add(join1Feed);
-        verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // + 3 = 1 feed and 2
-        // groups
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user +
-        // Group + 2Tags
-
-        // Get the before vertices and edges
-        beforeVerticesCount = getVerticesCount(service.getGraph());
-        beforeEdgesCount = getEdgesCount(service.getGraph());
-        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
-                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
-        outputFeeds.add(join2Feed);
-        verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY);
-
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1); // +1 feed
-        // +6 = user + 2tags + 2Groups + Cluster
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
-    }
-
-    @Test (dependsOnMethods = "testOnAddFeedEntity")
-    public void testOnAddProcessEntity() throws Exception {
-        // Get the before vertices and edges
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-
-        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION, inputFeeds, outputFeeds);
-
-        verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
-        verifyProcessEntityEdges();
-
-        // +4 = 1 process + 1 tag + 2 pipeline
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4);
-        // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 9);
-    }
-
-    @Test (dependsOnMethods = "testOnAddProcessEntity")
-    public void testOnAdd() throws Exception {
-        verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure");
-    }
-
-    @Test
-    public void testMapLineage() throws Exception {
-        setup();
-
-        // Get the before vertices and edges
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
-                , WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-
-        debug(service.getGraph());
-        GraphUtils.dump(service.getGraph());
-        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
-
-        // +6 = 1 process, 2 inputs = 3 instances,2 outputs
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 6);
-        //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 40);
-    }
-
-    @Test
-    public void testLineageForNoDateInFeedPath() throws Exception {
-        setupForNoDateInFeedPath();
-
-        // Get the before vertices and edges
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null,
-                        OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-
-        debug(service.getGraph());
-        GraphUtils.dump(service.getGraph());
-
-        // Verify if instance name has nominal time
-        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(
-                RelationshipType.FEED_INSTANCE.getName());
-        List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
-                "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
-        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
-
-        // +5 = 1 process, 2 inputs, 2 outputs
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 5);
-        //+34 = +26 for feed instances + 8 for process instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 34);
-    }
-
-    @Test
-    public void testLineageForReplication() throws Exception {
-        setupForLineageReplication();
-
-        // Get the before vertices and edges
-        // +7 [primary, bcp cluster] = cluster, colo, tag, user
-        // +3 [input feed] = feed, tag, group
-        // +4 [output feed] = 1 feed + 1 tag + 2 groups
-        // +4 [process] = 1 process + 1 tag + 2 pipeline
-        // +3 = 1 process, 1 input, 1 output
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-
-        // +4 [cluster] = cluster to colo and tag [primary and bcp],
-        // +4 [input feed] = cluster, tag, group, user
-        // +5 [output feed] = cluster + user + Group + 2Tags
-        // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines
-        // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
-                        "jail://global:00/falcon/raw-click/bcp/20140101",
-                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-
-        debug(service.getGraph());
-        GraphUtils.dump(service.getGraph());
-
-        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
-                "jail://global:00/falcon/raw-click/bcp/20140101", context,
-                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
-
-        // No new vertex added after replication
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0);
-
-        // +1 for replicated-to edge to target cluster for each output feed instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 1);
-    }
-
-    @Test
-    public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
-        cleanUp();
-        service.init();
-        addClusterAndFeedForReplication(inputFeeds);
-        // Get the vertices before running replication WF
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
-                        "jail://global:00/falcon/raw-click/bcp/20140101",
-                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-
-        debug(service.getGraph());
-        GraphUtils.dump(service.getGraph());
-
-        verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics");
-        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
-                "jail://global:00/falcon/raw-click/bcp/20140101", context,
-                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
-
-        // +1 for the new instance vertex added
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1);
-        // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
-    }
-
-    @Test
-    public void testLineageForRetention() throws Exception {
-        setupForLineageEviction();
-        // Get the before vertices and edges
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
-                        EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-
-        service.onSuccess(context);
-
-        debug(service.getGraph());
-        GraphUtils.dump(service.getGraph());
-        List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
-                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
-        List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
-                "clicks-feed/2014-01-01T00:00Z");
-        List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
-                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
-        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
-        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
-        for (String feedInstanceDataPath : paths) {
-            verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context,
-                    RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
-        }
-
-        // No new vertices added
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0);
-        // +2 for evicted-from edge from Feed Instance vertex to cluster
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2);
-    }
-
-    @Test
-    public void testLineageForRetentionWithNoFeedsEvicted() throws Exception {
-        cleanUp();
-        service.init();
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
-                        EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-
-        service.onSuccess(context);
-
-        debug(service.getGraph());
-        GraphUtils.dump(service.getGraph());
-        // No new vertices added
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount);
-        // No new edges added
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount);
-    }
-
-    @Test (dependsOnMethods = "testOnAdd")
-    public void testOnChange() throws Exception {
-        // shutdown the graph and resurrect for testing
-        service.destroy();
-        service.init();
-
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-
-        // cannot modify cluster, adding a new cluster
-        anotherCluster = addClusterEntity("another-cluster", "east-coast",
-                "classification=another");
-        verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY);
-
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = cluster, colo, tag
-        // +3 edges to user, colo and new tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3);
-    }
-
-    @Test(dependsOnMethods = "testOnChange")
-    public void testOnFeedEntityChange() throws Exception {
-        Feed oldFeed = inputFeeds.get(0);
-        Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity,
-                "classified-as=Secured,source=data-warehouse", "reporting");
-        addStorage(newFeed, Storage.TYPE.FILESYSTEM,
-                "jail://global:00/falcon/impression-feed/20140101");
-
-        long beforeVerticesCount = 0;
-        long beforeEdgesCount = 0;
-
-        try {
-            configStore.initiateUpdate(newFeed);
-
-            beforeVerticesCount = getVerticesCount(service.getGraph());
-            beforeEdgesCount = getEdgesCount(service.getGraph());
-
-            // add cluster
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                    new org.apache.falcon.entity.v0.feed.Cluster();
-            feedCluster.setName(anotherCluster.getName());
-            newFeed.getClusters().getClusters().add(feedCluster);
-
-            configStore.update(EntityType.FEED, newFeed);
-        } finally {
-            configStore.cleanupUpdateInit();
-        }
-
-        verifyUpdatedEdges(newFeed);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); //+2 = 2 new tags
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2); // +2 = 1 new cluster, 1 new tag
-    }
-
-    @Test
-    public void testLineageForTransactionFailure() throws Exception {
-        cleanUp();
-        service.init();
-        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
-                "classification=production");
-        verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
-        verifyClusterEntityEdges();
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 4); // +3 = cluster, colo, user, tag
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 3); // +2 = cluster to colo, user and tag
-
-        Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null);
-        inputFeeds.add(feed);
-        outputFeeds.add(feed);
-
-        try {
-            processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                    "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                    WORKFLOW_VERSION, inputFeeds, outputFeeds);
-            Assert.fail();
-        } catch (FalconException e) {
-            Assert.assertEquals(getVerticesCount(service.getGraph()), 4);
-            Assert.assertEquals(getEdgesCount(service.getGraph()), 3);
-        }
-
-    }
-
-    private void verifyUpdatedEdges(Feed newFeed) {
-        Vertex feedVertex = getEntityVertex(newFeed.getName(), RelationshipType.FEED_ENTITY);
-
-        // groups
-        Edge edge = feedVertex.getEdges(Direction.OUT, RelationshipLabel.GROUPS.getName()).iterator().next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "reporting");
-
-        // tags
-        edge = feedVertex.getEdges(Direction.OUT, "classified-as").iterator().next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "Secured");
-        edge = feedVertex.getEdges(Direction.OUT, "source").iterator().next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse");
-
-        // new cluster
-        List<String> actual = new ArrayList<>();
-        for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) {
-            actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name"));
-        }
-        Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "another-cluster")),
-                "Actual does not contain expected: " + actual);
-    }
-
-    @Test(dependsOnMethods = "testOnFeedEntityChange")
-    public void testOnProcessEntityChange() throws Exception {
-        long beforeVerticesCount = getVerticesCount(service.getGraph());
-        long beforeEdgesCount = getEdgesCount(service.getGraph());
-
-        Process oldProcess = processEntity;
-        Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster,
-                null, null);
-        EntityBuilderTestUtil.addProcessWorkflow(newProcess, GENERATE_WORKFLOW_NAME, "2.0.0");
-        EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
-
-        try {
-            configStore.initiateUpdate(newProcess);
-            configStore.update(EntityType.PROCESS, newProcess);
-        } finally {
-            configStore.cleanupUpdateInit();
-        }
-
-        verifyUpdatedEdges(newProcess);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); // +0, no net new
-        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount - 6); // -6 = -2 outputs, -1 tag,
-        // -1 cluster, -2 pipelines
-    }
-
-    @Test(dependsOnMethods = "testOnProcessEntityChange")
-    public void testAreSame() throws Exception {
-
-        Inputs inputs1 = new Inputs();
-        Inputs inputs2 = new Inputs();
-        Outputs outputs1 = new Outputs();
-        Outputs outputs2 = new Outputs();
-        // return true when both are null
-        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
-        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
-
-        Input i1 = new Input();
-        i1.setName("input1");
-        Input i2 = new Input();
-        i2.setName("input2");
-        Output o1 = new Output();
-        o1.setName("output1");
-        Output o2 = new Output();
-        o2.setName("output2");
-
-        inputs1.getInputs().add(i1);
-        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
-        outputs1.getOutputs().add(o1);
-        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
-
-        inputs2.getInputs().add(i1);
-        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
-        outputs2.getOutputs().add(o1);
-        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
-    }
-
-    @Test
-    public void testLineageForJobCounter() throws Exception {
-        setupForJobCounters();
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "IGNORE", "IGNORE", "IGNORE", "NONE"),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-        debug(service.getGraph());
-        GraphUtils.dump(service.getGraph());
-        Graph graph = service.getGraph();
-
-        Vertex vertex = graph.getVertices("name", "sample-process/2014-01-01T01:00Z").iterator().next();
-        Assert.assertEquals(vertex.getProperty("TIMETAKEN"), 36956L);
-        Assert.assertEquals(vertex.getProperty("COPY"), 30L);
-        Assert.assertEquals(vertex.getProperty("BYTESCOPIED"), 1000L);
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 9);
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 14);
-        verifyLineageGraphForJobCounters(context);
-    }
-
-    private void verifyUpdatedEdges(Process newProcess) {
-        Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
-
-        // cluster
-        Edge edge = processVertex.getEdges(Direction.OUT,
-                RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next();
-        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), anotherCluster.getName());
-
-        // inputs
-        edge = processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName()).iterator().next();
-        Assert.assertEquals(edge.getVertex(Direction.OUT).getProperty("name"),
-                newProcess.getInputs().getInputs().get(0).getFeed());
-
-        // outputs
-        for (Edge e : processVertex.getEdges(Direction.OUT, RelationshipLabel.PROCESS_FEED_EDGE.getName())) {
-            Assert.fail("there should not be any edges to output feeds" + e);
-        }
-    }
-
-    public static void debug(final Graph graph) {
-        System.out.println("*****Vertices of " + graph);
-        for (Vertex vertex : graph.getVertices()) {
-            System.out.println(GraphUtils.vertexString(vertex));
-        }
-
-        System.out.println("*****Edges of " + graph);
-        for (Edge edge : graph.getEdges()) {
-            System.out.println(GraphUtils.edgeString(edge));
-        }
-    }
-
-    private Cluster addClusterEntity(String name, String colo, String tags) throws Exception {
-        Cluster cluster = EntityBuilderTestUtil.buildCluster(name, colo, tags);
-        configStore.publish(EntityType.CLUSTER, cluster);
-        return cluster;
-    }
-
-    private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups,
-                               Storage.TYPE storageType, String uriTemplate) throws Exception {
-        return addFeedEntity(feedName, new Cluster[]{cluster}, tags, groups, storageType, uriTemplate);
-    }
-
-    private Feed addFeedEntity(String feedName, Cluster[] clusters, String tags, String groups,
-                               Storage.TYPE storageType, String uriTemplate) throws Exception {
-        Feed feed = EntityBuilderTestUtil.buildFeed(feedName, clusters,
-                tags, groups);
-        addStorage(feed, storageType, uriTemplate);
-        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
-            if (feedCluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) {
-                feedCluster.setType(ClusterType.TARGET);
-            }
-        }
-        configStore.publish(EntityType.FEED, feed);
-        return feed;
-    }
-
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public Process addProcessEntity(String processName, Cluster cluster,
-                                    String tags, String pipelineTags, String workflowName,
-                                    String version, List<Feed> inFeeds, List<Feed> outFeeds) throws Exception {
-        Process process = EntityBuilderTestUtil.buildProcess(processName, cluster,
-                tags, pipelineTags);
-        EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version);
-
-        for (Feed inputFeed : inFeeds) {
-            EntityBuilderTestUtil.addInput(process, inputFeed);
-        }
-
-        for (Feed outputFeed : outFeeds) {
-            EntityBuilderTestUtil.addOutput(process, outputFeed);
-        }
-
-        configStore.publish(EntityType.PROCESS, process);
-        return process;
-    }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
-    private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
-        if (storageType == Storage.TYPE.FILESYSTEM) {
-            Locations locations = new Locations();
-            feed.setLocations(locations);
-
-            Location location = new Location();
-            location.setType(LocationType.DATA);
-            location.setPath(uriTemplate);
-            feed.getLocations().getLocations().add(location);
-        } else {
-            CatalogTable table = new CatalogTable();
-            table.setUri(uriTemplate);
-            feed.setTable(table);
-        }
-    }
-
-    private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed,
-                                   Storage.TYPE storageType, String uriTemplate) {
-        if (storageType == Storage.TYPE.FILESYSTEM) {
-            Locations locations = new Locations();
-            feed.setLocations(locations);
-
-            Location location = new Location();
-            location.setType(LocationType.DATA);
-            location.setPath(uriTemplate);
-            cluster.setLocations(new Locations());
-            cluster.getLocations().getLocations().add(location);
-        } else {
-            CatalogTable table = new CatalogTable();
-            table.setUri(uriTemplate);
-            cluster.setTable(table);
-        }
-    }
-
-    private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) {
-        Vertex entityVertex = getEntityVertex(entityName, entityType);
-        Assert.assertNotNull(entityVertex);
-        verifyEntityProperties(entityVertex, entityName, entityType);
-    }
-
-    private void verifyEntityProperties(Vertex entityVertex, String entityName, RelationshipType entityType) {
-        Assert.assertEquals(entityName, entityVertex.getProperty(RelationshipProperty.NAME.getName()));
-        Assert.assertEquals(entityType.getName(), entityVertex.getProperty(RelationshipProperty.TYPE.getName()));
-        Assert.assertNotNull(entityVertex.getProperty(RelationshipProperty.TIMESTAMP.getName()));
-    }
-
-    private void verifyClusterEntityEdges() {
-        Vertex clusterVertex = getEntityVertex(CLUSTER_ENTITY_NAME,
-                RelationshipType.CLUSTER_ENTITY);
-
-        // verify edge to user vertex
-        verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.USER.getName(),
-                FALCON_USER, RelationshipType.USER.getName());
-        // verify edge to colo vertex
-        verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(),
-                COLO_NAME, RelationshipType.COLO.getName());
-        // verify edge to tags vertex
-        verifyVertexForEdge(clusterVertex, Direction.OUT, "classification",
-                "production", RelationshipType.TAGS.getName());
-    }
-
-    private void verifyFeedEntityEdges(String feedName, String tag, String group) {
-        Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY);
-
-        // verify edge to cluster vertex
-        verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(),
-                CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName());
-        // verify edge to user vertex
-        verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(),
-                FALCON_USER, RelationshipType.USER.getName());
-
-        // verify edge to tags vertex
-        verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as",
-                tag, RelationshipType.TAGS.getName());
-        // verify edge to group vertex
-        verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(),
-                group, RelationshipType.GROUPS.getName());
-    }
-
-    private void verifyProcessEntityEdges() {
-        Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME,
-                RelationshipType.PROCESS_ENTITY);
-
-        // verify edge to cluster vertex
-        verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName(),
-                CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName());
-        // verify edge to user vertex
-        verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.USER.getName(),
-                FALCON_USER, RelationshipType.USER.getName());
-        // verify edge to tags vertex
-        verifyVertexForEdge(processVertex, Direction.OUT, "classified-as",
-                "Critical", RelationshipType.TAGS.getName());
-
-        // verify edges to inputs
-        List<String> actual = new ArrayList<>();
-        for (Edge edge : processVertex.getEdges(Direction.IN,
-                RelationshipLabel.FEED_PROCESS_EDGE.getName())) {
-            Vertex outVertex = edge.getVertex(Direction.OUT);
-            Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(),
-                    outVertex.getProperty(RelationshipProperty.TYPE.getName()));
-            actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName()));
-        }
-
-        Assert.assertTrue(actual.containsAll(Arrays.asList("impression-feed", "clicks-feed")),
-                "Actual does not contain expected: " + actual);
-
-        actual.clear();
-        // verify edges to outputs
-        for (Edge edge : processVertex.getEdges(Direction.OUT,
-                RelationshipLabel.PROCESS_FEED_EDGE.getName())) {
-            Vertex outVertex = edge.getVertex(Direction.IN);
-            Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(),
-                    outVertex.getProperty(RelationshipProperty.TYPE.getName()));
-            actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName()));
-        }
-        Assert.assertTrue(actual.containsAll(Arrays.asList("imp-click-join1", "imp-click-join2")),
-                "Actual does not contain expected: " + actual);
-    }
-
-    private Vertex getEntityVertex(String entityName, RelationshipType entityType) {
-        GraphQuery entityQuery = getQuery()
-                .has(RelationshipProperty.NAME.getName(), entityName)
-                .has(RelationshipProperty.TYPE.getName(), entityType.getName());
-        Iterator<Vertex> iterator = entityQuery.vertices().iterator();
-        Assert.assertTrue(iterator.hasNext());
-
-        Vertex entityVertex = iterator.next();
-        Assert.assertNotNull(entityVertex);
-
-        return entityVertex;
-    }
-
-    private void verifyVertexForEdge(Vertex fromVertex, Direction direction, String label,
-                                     String expectedName, String expectedType) {
-        boolean found = false;
-        for (Edge edge : fromVertex.getEdges(direction, label)) {
-            found = true;
-            Vertex outVertex = edge.getVertex(Direction.IN);
-            Assert.assertEquals(
-                    outVertex.getProperty(RelationshipProperty.NAME.getName()), expectedName);
-            Assert.assertEquals(
-                    outVertex.getProperty(RelationshipProperty.TYPE.getName()), expectedType);
-        }
-        Assert.assertFalse((!found), "Edge not found");
-    }
-
-    private void verifyEntityGraph(RelationshipType feedType, String classification) {
-        // feeds owned by a user
-        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName());
-        Assert.assertEquals(feedNamesOwnedByUser,
-                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1",
-                        "imp-click-join2")
-        );
-
-        // feeds classified as secure
-        verifyFeedsClassifiedAsSecure(feedType.getName(),
-                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
-
-        // feeds owned by a user and classified as secure
-        verifyFeedsOwnedByUserAndClassification(feedType.getName(), classification,
-                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
-    }
-
-    private List<String> getFeedsOwnedByAUser(String feedType) {
-        GraphQuery userQuery = getQuery()
-                .has(RelationshipProperty.NAME.getName(), FALCON_USER)
-                .has(RelationshipProperty.TYPE.getName(), RelationshipType.USER.getName());
-
-        List<String> feedNames = new ArrayList<>();
-        for (Vertex userVertex : userQuery.vertices()) {
-            for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) {
-                if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
-                    System.out.println(FALCON_USER + " owns -> " + GraphUtils.vertexString(feed));
-                    feedNames.add(feed.<String>getProperty(RelationshipProperty.NAME.getName()));
-                }
-            }
-        }
-
-        return feedNames;
-    }
-
-    private void verifyFeedsClassifiedAsSecure(String feedType, List<String> expected) {
-        GraphQuery classQuery = getQuery()
-                .has(RelationshipProperty.NAME.getName(), "Secure")
-                .has(RelationshipProperty.TYPE.getName(), RelationshipType.TAGS.getName());
-
-        List<String> actual = new ArrayList<>();
-        for (Vertex feedVertex : classQuery.vertices()) {
-            for (Vertex feed : feedVertex.getVertices(Direction.BOTH, "classified-as")) {
-                if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
-                    System.out.println(" Secure classification -> " + GraphUtils.vertexString(feed));
-                    actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName()));
-                }
-            }
-        }
-
-        Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
-    }
-
-    private void verifyFeedsOwnedByUserAndClassification(String feedType, String classification,
-                                                         List<String> expected) {
-        List<String> actual = new ArrayList<>();
-        Vertex userVertex = getEntityVertex(FALCON_USER, RelationshipType.USER);
-        for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) {
-            if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
-                for (Vertex classVertex : feed.getVertices(Direction.OUT, "classified-as")) {
-                    if (classVertex.getProperty(RelationshipProperty.NAME.getName())
-                            .equals(classification)) {
-                        actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName()));
-                        System.out.println(classification + " feed owned by falcon-user -> "
-                                + GraphUtils.vertexString(feed));
-                    }
-                }
-            }
-        }
-        Assert.assertTrue(actual.containsAll(expected),
-                "Actual does not contain expected: " + actual);
-    }
-
-    public long getVerticesCount(final Graph graph) {
-        long count = 0;
-        for (Vertex ignored : graph.getVertices()) {
-            count++;
-        }
-
-        return count;
-    }
-
-    public long getEdgesCount(final Graph graph) {
-        long count = 0;
-        for (Edge ignored : graph.getEdges()) {
-            count++;
-        }
-
-        return count;
-    }
-
-    private void verifyLineageGraph(String feedType) {
-        List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
-                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
-        List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
-                "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
-        List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
-                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
-        verifyLineageGraph(feedType, expectedFeeds, secureFeeds, ownedAndSecureFeeds);
-    }
-
-    private void verifyLineageGraph(String feedType, List<String> expectedFeeds,
-                                    List<String> secureFeeds, List<String> ownedAndSecureFeeds) {
-        // feeds owned by a user
-        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType);
-        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expectedFeeds));
-
-        Graph graph = service.getGraph();
-
-        Iterator<Vertex> vertices = graph.getVertices("name", "impression-feed/2014-01-01T00:00Z").iterator();
-        Assert.assertTrue(vertices.hasNext());
-        Vertex feedInstanceVertex = vertices.next();
-        Assert.assertEquals(feedInstanceVertex.getProperty(RelationshipProperty.TYPE.getName()),
-                RelationshipType.FEED_INSTANCE.getName());
-
-        Object vertexId = feedInstanceVertex.getId();
-        Vertex vertexById = graph.getVertex(vertexId);
-        Assert.assertEquals(vertexById, feedInstanceVertex);
-
-        // feeds classified as secure
-        verifyFeedsClassifiedAsSecure(feedType, secureFeeds);
-
-        // feeds owned by a user and classified as secure
-        verifyFeedsOwnedByUserAndClassification(feedType, "Financial", ownedAndSecureFeeds);
-    }
-
-    private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath,
-                                                            WorkflowExecutionContext context,
-                                                            RelationshipLabel edgeLabel) throws Exception {
-        String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
-                , context.getClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
-
-        Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
-                .iterator().next();
-        Assert.assertNotNull(edge);
-        Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName())
-                , context.getTimeStampAsISO8601());
-
-        Vertex clusterVertex = edge.getVertex(Direction.IN);
-        Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName());
-    }
-
-    private void verifyLineageGraphForJobCounters(WorkflowExecutionContext context) throws Exception {
-        Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME,
-                RelationshipType.PROCESS_ENTITY);
-        Assert.assertEquals(processVertex.getProperty("name"), PROCESS_ENTITY_NAME);
-        Assert.assertTrue(context.getCounters().length()>0);
-    }
-
-    private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames,
-                                               String feedInstancePaths, String falconInputPaths,
-                                               String falconInputFeeds) {
-        String cluster;
-        if (EntityOperations.REPLICATE == operation) {
-            cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
-        } else {
-            cluster = CLUSTER_ENTITY_NAME;
-        }
-
-        return new String[]{
-            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster,
-            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
-            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
-            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
-            "-" + WorkflowExecutionArgs.OPERATION.getName(), operation.toString(),
-
-            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
-            (falconInputFeeds != null ? falconInputFeeds : INPUT_FEED_NAMES),
-            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
-            (falconInputPaths != null ? falconInputPaths : INPUT_INSTANCE_PATHS),
-
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
-            (outputFeedNames != null ? outputFeedNames : OUTPUT_FEED_NAMES),
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
-            (feedInstancePaths != null ? feedInstancePaths : OUTPUT_INSTANCE_PATHS),
-
-            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
-            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
-            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
-            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
-            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
-
-            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
-            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
-            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), wfName,
-            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
-            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
-
-            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
-            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
-            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
-            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
-            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
-
-            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
-        };
-    }
-
-    private void setupForJobCounters() throws Exception {
-        cleanUp();
-        service.init();
-        // Add cluster
-        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
-                "classification=production");
-        List<Feed> inFeeds = new ArrayList<>();
-        List<Feed> outFeeds = new ArrayList<>();
-
-        createJobCountersFileForTest();
-        // Add process
-        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION, inFeeds, outFeeds);
-    }
-
-    private void createJobCountersFileForTest() throws Exception {
-        Path counterFile = new Path(LOGS_DIR, "counter.txt");
-        OutputStream out = null;
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    new Path(LOGS_DIR).toUri());
-            out = fs.create(counterFile);
-            out.write(COUNTERS.getBytes());
-            out.flush();
-        }  finally {
-            out.close();
-        }
-    }
-
-    private void setup() throws Exception {
-        cleanUp();
-        service.init();
-
-        // Add cluster
-        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
-                "classification=production");
-
-        addFeedsAndProcess(clusterEntity);
-    }
-
-    private void addFeedsAndProcess(Cluster cluster) throws Exception  {
-        // Add input and output feeds
-        Feed impressionsFeed = addFeedEntity("impression-feed", cluster,
-                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
-                "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
-        List<Feed> inFeeds = new ArrayList<>();
-        List<Feed> outFeeds = new ArrayList<>();
-        inFeeds.add(impressionsFeed);
-        Feed clicksFeed = addFeedEntity("clicks-feed", cluster,
-                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
-                "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
-        inFeeds.add(clicksFeed);
-        Feed join1Feed = addFeedEntity("imp-click-join1", cluster,
-                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        outFeeds.add(join1Feed);
-        Feed join2Feed = addFeedEntity("imp-click-join2", cluster,
-                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
-        outFeeds.add(join2Feed);
-        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION, inFeeds, outFeeds);
-    }
-
-    private void setupForLineageReplication() throws Exception {
-        cleanUp();
-        service.init();
-
-        List<Feed> inFeeds = new ArrayList<>();
-        List<Feed> outFeeds = new ArrayList<>();
-
-        addClusterAndFeedForReplication(inFeeds);
-
-        // Add output feed
-        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
-                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        outFeeds.add(join1Feed);
-
-        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION, inFeeds, outFeeds);
-
-        // GENERATE WF should have run before this to create all instance related vertices
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
-                "jail://global:00/falcon/imp-click-join1/20140101",
-                "jail://global:00/falcon/raw-click/primary/20140101",
-                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-    }
-
-    private void addClusterAndFeedForReplication(List<Feed> inFeeds) throws Exception {
-        // Add cluster
-        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
-                "classification=production");
-        // Add backup cluster
-        Cluster bcpCluster = addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
-
-        Cluster[] clusters = {clusterEntity, bcpCluster};
-
-        // Add feed
-        Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters,
-                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
-                "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
-        // Add uri template for each cluster
-        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : rawFeed.getClusters().getClusters()) {
-            if (feedCluster.getName().equals(CLUSTER_ENTITY_NAME)) {
-                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
-                        "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}");
-            } else {
-                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
-                        "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}");
-            }
-        }
-
-        // update config store
-        try {
-            configStore.initiateUpdate(rawFeed);
-            configStore.update(EntityType.FEED, rawFeed);
-        } finally {
-            configStore.cleanupUpdateInit();
-        }
-        inFeeds.add(rawFeed);
-    }
-
-    private void setupForLineageEviction() throws Exception {
-        setup();
-
-        // GENERATE WF should have run before this to create all instance related vertices
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME,
-                        "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-    }
-
-    private void setupForNoDateInFeedPath() throws Exception {
-        cleanUp();
-        service.init();
-
-        // Add cluster
-        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
-                "classification=production");
-        List<Feed> inFeeds = new ArrayList<>();
-        List<Feed> outFeeds = new ArrayList<>();
-        // Add input and output feeds
-        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
-                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
-                "/falcon/impression-feed");
-        inFeeds.add(impressionsFeed);
-        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
-                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
-                "/falcon/clicks-feed");
-        inFeeds.add(clicksFeed);
-        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
-                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join1");
-        outFeeds.add(join1Feed);
-        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
-                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join2");
-        outFeeds.add(join2Feed);
-        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION, inFeeds, outFeeds);
-
-    }
-
-    private void cleanUp() throws Exception {
-        cleanupGraphStore(service.getGraph());
-        cleanupConfigurationStore(configStore);
-        service.destroy();
-    }
-
-    private void cleanupGraphStore(Graph graph) {
-        for (Edge edge : graph.getEdges()) {
-            graph.removeEdge(edge);
-        }
-
-        for (Vertex vertex : graph.getVertices()) {
-            graph.removeVertex(vertex);
-        }
-
-        graph.shutdown();
-    }
-
-    private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception {
-        for (EntityType type : EntityType.values()) {
-            Collection<String> entities = store.getEntities(type);
-            for (String entity : entities) {
-                store.remove(type, entity);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java b/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java
deleted file mode 100644
index 0f2ee7b..0000000
--- a/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.retention;
-
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Unit test for EvictedInstanceSerDe.
- */
-public class EvictedInstanceSerDeTest {
-
-    private EmbeddedCluster cluster;
-    private FileSystem fs;
-    private Path csvFilePath;
-    private StringBuffer evictedInstancePaths = new StringBuffer(
-            "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2010")
-            .append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR)
-            .append("thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2011");
-
-    @BeforeClass
-    public void start() throws Exception {
-        cluster = EmbeddedCluster.newCluster("test");
-        String hdfsUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
-
-        fs = FileSystem.get(cluster.getConf());
-        csvFilePath = new Path(hdfsUrl + "/falcon/staging/feed/instancePaths-2014-10-01-01-00.csv");
-    }
-
-    @AfterClass
-    public void close() throws Exception {
-        cluster.shutdown();
-    }
-
-    @Test
-    public void testSerializeEvictedInstancePathsForNoEviction() throws Exception {
-        EvictedInstanceSerDe.serializeEvictedInstancePaths(fs, csvFilePath, new StringBuffer());
-
-        Assert.assertEquals(readLogFile(csvFilePath),
-                EvictedInstanceSerDe.INSTANCEPATH_PREFIX);
-    }
-
-    @Test
-    public void testSerializeEvictedInstancePathsWithEviction() throws Exception {
-        EvictedInstanceSerDe.serializeEvictedInstancePaths(fs, csvFilePath, evictedInstancePaths);
-        Assert.assertEquals(readLogFile(csvFilePath), evictedInstancePaths.toString());
-    }
-
-    @Test(dependsOnMethods = "testSerializeEvictedInstancePathsForNoEviction")
-    public void testDeserializeEvictedInstancePathsForNoEviction() throws Exception {
-        String[] instancePaths = EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, csvFilePath);
-        Assert.assertEquals(instancePaths.length, 0);
-    }
-
-    @Test(dependsOnMethods = "testSerializeEvictedInstancePathsWithEviction")
-    public void testDeserializeEvictedInstancePathsWithEviction() throws Exception {
-        String[] instancePaths = EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, csvFilePath);
-        Assert.assertEquals(instancePaths.length, 2);
-        Assert.assertTrue(instancePaths[0].equals(
-                "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2010"));
-        Assert.assertTrue(instancePaths[1].equals(
-                "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2011"));
-
-    }
-
-    private String readLogFile(Path logFile) throws IOException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream date = fs.open(logFile);
-        IOUtils.copyBytes(date, writer, 4096, true);
-        return writer.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java b/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
deleted file mode 100644
index 7979fe0..0000000
--- a/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.util.FalconTestUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
-import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.File;
-
-
-/**
- * Unit test for AuthenticationInitializationService that employs mocks.
- */
-public class AuthenticationInitializationServiceTest {
-
-    private AuthenticationInitializationService authenticationService;
-
-    @Mock
-    private UserGroupInformation mockLoginUser;
-
-    @BeforeClass
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-
-        authenticationService = new AuthenticationInitializationService();
-    }
-
-    @Test
-    public void testGetName() {
-        Assert.assertEquals("Authentication initialization service",
-                authenticationService.getName());
-    }
-
-    @Test
-    public void testInitForSimpleAuthenticationMethod() {
-        try {
-            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
-                    PseudoAuthenticationHandler.TYPE);
-            authenticationService.init();
-
-            UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-            Assert.assertFalse(loginUser.isFromKeytab());
-            Assert.assertEquals(loginUser.getAuthenticationMethod().name().toLowerCase(),
-                    PseudoAuthenticationHandler.TYPE);
-            Assert.assertEquals(System.getProperty("user.name"), loginUser.getUserName());
-        } catch (Exception e) {
-            Assert.fail("AuthenticationInitializationService init failed.", e);
-        }
-    }
-
-    @Test
-    public void testKerberosAuthenticationWithKeytabFileDoesNotExist() {
-        try {
-            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
-                    KerberosAuthenticationHandler.TYPE);
-            StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/blah/blah");
-            authenticationService.init();
-            Assert.fail("The keytab file does not exist! must have been thrown.");
-        } catch (Exception e) {
-            Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
-        }
-    }
-
-    @Test
-    public void testKerberosAuthenticationWithKeytabFileIsADirectory() {
-        try {
-            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
-                    KerberosAuthenticationHandler.TYPE);
-            StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/tmp/");
-            authenticationService.init();
-            Assert.fail("The keytab file cannot be a directory! must have been thrown.");
-        } catch (Exception e) {
-            Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
-        }
-    }
-
-    @Test
-    public void testKerberosAuthenticationWithKeytabFileNotReadable() {
-        File tempFile = new File(".keytabFile");
-        try {
-            assert tempFile.createNewFile();
-            assert tempFile.setReadable(false);
-
-            StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
-                    KerberosAuthenticationHandler.TYPE);
-            StartupProperties.get().setProperty(
-                    AuthenticationInitializationService.KERBEROS_KEYTAB, tempFile.toString());
-            authenticationService.init();
-            Assert.fail("The keytab file is not readable! must have been thrown.");
-        } catch (Exception e) {
-            Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
-        } finally {
-            assert tempFile.delete();
-        }
-    }
-
-    @Test (enabled = false)
-    public void testInitForKerberosAuthenticationMethod() throws FalconException {
-        Mockito.when(mockLoginUser.getAuthenticationMethod())
-                .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
-        Mockito.when(mockLoginUser.getUserName()).thenReturn(FalconTestUtil.TEST_USER_1);
-        Mockito.when(mockLoginUser.isFromKeytab()).thenReturn(Boolean.TRUE);
-
-        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE,
-                KerberosAuthenticationHandler.TYPE);
-        StartupProperties.get().setProperty(
-                AuthenticationInitializationService.KERBEROS_KEYTAB, "falcon.kerberos.keytab");
-        StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_PRINCIPAL,
-                FalconTestUtil.TEST_USER_1);
-
-        authenticationService.init();
-
-        Assert.assertTrue(mockLoginUser.isFromKeytab());
-        Assert.assertEquals(mockLoginUser.getAuthenticationMethod().name(),
-                KerberosAuthenticationHandler.TYPE);
-        Assert.assertEquals(FalconTestUtil.TEST_USER_1, mockLoginUser.getUserName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
deleted file mode 100644
index 5cc6c70..0000000
--- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
-import org.apache.falcon.service.GroupsService;
-import org.apache.falcon.service.ProxyUserService;
-import org.apache.falcon.service.Services;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.util.FalconTestUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * Test for current user's thread safety.
- */
-public class CurrentUserTest {
-    private ProxyUserService proxyUserService;
-    private GroupsService groupsService;
-
-    @BeforeClass
-    public void setUp() throws Exception {
-        Services.get().register(new ProxyUserService());
-        Services.get().register(new GroupsService());
-        groupsService = Services.get().getService(GroupsService.SERVICE_NAME);
-        proxyUserService = Services.get().getService(ProxyUserService.SERVICE_NAME);
-        groupsService.init();
-
-        RuntimeProperties.get().setProperty("falcon.service.ProxyUserService.proxyuser.foo.hosts", "*");
-        RuntimeProperties.get().setProperty("falcon.service.ProxyUserService.proxyuser.foo.groups", "*");
-        proxyUserService.init();
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        proxyUserService.destroy();
-        groupsService.destroy();
-        Services.get().reset();
-    }
-
-    @AfterMethod
-    public void cleanUp() {
-        CurrentUser.clear();
-    }
-
-    @Test(threadPoolSize = 10, invocationCount = 10, timeOut = 10000)
-    public void testGetUser() throws Exception {
-        String id = Long.toString(System.nanoTime());
-        CurrentUser.authenticate(id);
-        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), id);
-        Assert.assertEquals(CurrentUser.getUser(), id);
-    }
-
-    @Test (expectedExceptions = IllegalStateException.class)
-    public void testAuthenticateBadUser() throws Exception {
-        CurrentUser.authenticate("");
-    }
-
-    @Test (expectedExceptions = IllegalStateException.class)
-    public void testGetAuthenticatedUserInvalid() throws Exception {
-        CurrentUser.getAuthenticatedUser();
-    }
-
-    @Test (expectedExceptions = IllegalStateException.class)
-    public void testGetUserInvalid() throws Exception {
-        CurrentUser.getUser();
-    }
-
-    @Test (expectedExceptions = IllegalStateException.class)
-    public void testProxyBadUser() throws Exception {
-        CurrentUser.authenticate(FalconTestUtil.TEST_USER_1);
-        CurrentUser.proxy("", "");
-    }
-
-    @Test (expectedExceptions = IllegalStateException.class)
-    public void testProxyWithNoAuth() throws Exception {
-        CurrentUser.proxy(FalconTestUtil.TEST_USER_1, "falcon");
-    }
-
-    @Test
-    public void testGetProxyUserForAuthenticatedUser() throws Exception {
-        CurrentUser.authenticate("proxy");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
-    }
-
-    @Test
-    public void testProxy() throws Exception {
-        CurrentUser.authenticate("real");
-
-        CurrentUser.proxy(EntityBuilderTestUtil.USER, "users");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), EntityBuilderTestUtil.USER);
-
-        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "real");
-        Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER);
-    }
-
-    @Test
-    public void testProxySameUser() throws Exception {
-        CurrentUser.authenticate(FalconTestUtil.TEST_USER_1);
-
-        CurrentUser.proxy(FalconTestUtil.TEST_USER_1, "users");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), FalconTestUtil.TEST_USER_1);
-
-        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), FalconTestUtil.TEST_USER_1);
-        Assert.assertEquals(CurrentUser.getUser(), FalconTestUtil.TEST_USER_1);
-    }
-
-    @Test
-    public void testSuperUser() throws Exception {
-        CurrentUser.authenticate(EntityBuilderTestUtil.USER);
-        CurrentUser.proxy("proxy", "users");
-
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
-
-        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), EntityBuilderTestUtil.USER);
-        Assert.assertEquals(CurrentUser.getUser(), "proxy");
-    }
-
-    @Test(expectedExceptions = IllegalStateException.class)
-    public void testProxyDoAsUserWithNoAuth() throws Exception {
-        CurrentUser.proxyDoAsUser("falcon", "localhost");
-    }
-
-    @Test
-    public void testProxyDoAsUser() throws Exception {
-        CurrentUser.authenticate("foo");
-
-        CurrentUser.proxyDoAsUser(EntityBuilderTestUtil.USER, "localhost");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), EntityBuilderTestUtil.USER);
-
-        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "foo");
-        Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER);
-    }
-
-    @Test
-    public void testProxyDoAsSameUser() throws Exception {
-        CurrentUser.authenticate("foo");
-
-        CurrentUser.proxyDoAsUser("foo", "localhost");
-        UserGroupInformation proxyUgi = CurrentUser.getProxyUGI();
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), "foo");
-
-        Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "foo");
-        Assert.assertEquals(CurrentUser.getUser(), "foo");
-    }
-}


Mime
View raw message