falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [1/2] falcon git commit: FALCON-1101 Cluster submission in falcon does not create an owned-by edge. Contributed by Sowmya Ramesh
Date Mon, 15 Jun 2015 05:23:27 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 7ffe1a33b -> e093668a8


http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig
new file mode 100644
index 0000000..30eeaa4
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig
@@ -0,0 +1,1107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.metadata;
+
+import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Graph;
+import com.tinkerpop.blueprints.GraphQuery;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.retention.EvictedInstanceSerDe;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test for Metadata relationship mapping service.
+ */
+public class MetadataMappingServiceTest {
+
+    public static final String FALCON_USER = "falcon-user";
+    private static final String LOGS_DIR = "/falcon/staging/feed/logs";
+    private static final String NOMINAL_TIME = "2014-01-01-01-00";
+
+    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster";
+    public static final String PROCESS_ENTITY_NAME = "sample-process";
+    public static final String COLO_NAME = "west-coast";
+    public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow";
+    public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow";
+    private static final String EVICTION_WORKFLOW_NAME = "eviction-policy-workflow";
+    public static final String WORKFLOW_VERSION = "1.0.9";
+
+    public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
+    public static final String INPUT_INSTANCE_PATHS =
+        "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
+                + "#jail://global:00/falcon/clicks-feed/2014-01-01";
+    public static final String INPUT_INSTANCE_PATHS_NO_DATE =
+            "jail://global:00/falcon/impression-feed,jail://global:00/falcon/impression-feed"
+                    + "#jail://global:00/falcon/clicks-feed";
+
+    public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
+    public static final String OUTPUT_INSTANCE_PATHS =
+        "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+    private static final String REPLICATED_FEED = "raw-click";
+    private static final String EVICTED_FEED = "imp-click-join1";
+    private static final String EVICTED_INSTANCE_PATHS =
+            "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
+    public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
+            "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
+
+    public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
+
+    private ConfigurationStore configStore;
+    private MetadataMappingService service;
+
+    private Cluster clusterEntity;
+    private Cluster anotherCluster;
+    private List<Feed> inputFeeds = new ArrayList<Feed>();
+    private List<Feed> outputFeeds = new ArrayList<Feed>();
+    private Process processEntity;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        CurrentUser.authenticate(FALCON_USER);
+
+        configStore = ConfigurationStore.get();
+
+        Services.get().register(new WorkflowJobEndNotificationService());
+        StartupProperties.get().setProperty("falcon.graph.storage.directory",
+                "target/graphdb-" + System.currentTimeMillis());
+        StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
+        service = new MetadataMappingService();
+        service.init();
+
+        Set<String> vertexPropertyKeys = service.getVertexIndexedKeys();
+        System.out.println("Got vertex property keys: " + vertexPropertyKeys);
+
+        Set<String> edgePropertyKeys = service.getEdgeIndexedKeys();
+        System.out.println("Got edge property keys: " + edgePropertyKeys);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        GraphUtils.dump(service.getGraph(), System.out);
+
+        cleanUp();
+        StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
+    }
+
+    @AfterMethod
+    public void printGraph() throws Exception {
+        GraphUtils.dump(service.getGraph());
+    }
+
+    private GraphQuery getQuery() {
+        return service.getGraph().query();
+    }
+
+    @Test
+    public void testGetName() throws Exception {
+        Assert.assertEquals(service.getName(), MetadataMappingService.SERVICE_NAME);
+    }
+
+    @Test
+    public void testOnAddClusterEntity() throws Exception {
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+
+        verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
+        verifyClusterEntityEdges();
+
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag
+    }
+
+    @Test (dependsOnMethods = "testOnAddClusterEntity")
+    public void testOnAddFeedEntity() throws Exception {
+        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+        inputFeeds.add(impressionsFeed);
+        verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
+        verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics");
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
+
+        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
+        inputFeeds.add(clicksFeed);
+        verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag
+
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join1Feed);
+        verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user +
+        // Group + 2Tags
+
+        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join2Feed);
+        verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY);
+
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 13); // +1 feed
+        // +6 = user + 2tags + 2Groups + Cluster
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 22);
+    }
+
+    @Test (dependsOnMethods = "testOnAddFeedEntity")
+    public void testOnAddProcessEntity() throws Exception {
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION, inputFeeds, outputFeeds);
+
+        verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY);
+        verifyProcessEntityEdges();
+
+        // +4 = 1 process + 1 tag + 2 pipeline
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 17);
+        // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 31);
+    }
+
+    @Test (dependsOnMethods = "testOnAddProcessEntity")
+    public void testOnAdd() throws Exception {
+        verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure");
+    }
+
+    @Test
+    public void testMapLineage() throws Exception {
+        setup();
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
+                , WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
+
+        // +6 = 1 process, 2 inputs = 3 instances,2 outputs
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
+        //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 71);
+    }
+
+    @Test
+    public void testLineageForNoDateInFeedPath() throws Exception {
+        setupForNoDateInFeedPath();
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null,
+                        OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+
+        // Verify if instance name has nominal time
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(
+                RelationshipType.FEED_INSTANCE.getName());
+        List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
+                "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
+        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
+
+        // +5 = 1 process, 2 inputs, 2 outputs
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 22);
+        //+34 = +26 for feed instances + 8 for process instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 65);
+    }
+
+    @Test
+    public void testLineageForReplication() throws Exception {
+        setupForLineageReplication();
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
+                        "jail://global:00/falcon/raw-click/bcp/20140101",
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
+                "jail://global:00/falcon/raw-click/bcp/20140101", context,
+                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+        // +6 [primary, bcp cluster] = cluster, colo, tag,
+        // +4 [input feed] = feed, tag, group, user
+        // +4 [output feed] = 1 feed + 1 tag + 2 groups
+        // +4 [process] = 1 process + 1 tag + 2 pipeline
+        // +3 = 1 process, 1 input, 1 output
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
+        // +4 [cluster] = cluster to colo and tag [primary and bcp],
+        // +4 [input feed] = cluster, tag, group, user
+        // +5 [output feed] = cluster + user + Group + 2Tags
+        // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines
+        // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance
+        // +1 for replicated-to edge to target cluster for each output feed instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 40);
+    }
+
+    @Test
+    public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
+        cleanUp();
+        service.init();
+        addClusterAndFeedForReplication(inputFeeds);
+        // Get the vertices before running replication WF
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
+                        "jail://global:00/falcon/raw-click/bcp/20140101",
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+
+        verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics");
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
+                "jail://global:00/falcon/raw-click/bcp/20140101", context,
+                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+        // +1 for the new instance vertex added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1);
+        // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
+    }
+
+    @Test
+    public void testLineageForRetention() throws Exception {
+        setupForLineageEviction();
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+                        EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
+        List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
+                "clicks-feed/2014-01-01T00:00Z");
+        List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
+        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
+        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
+        for (String feedInstanceDataPath : paths) {
+            verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context,
+                    RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
+        }
+
+        // No new vertices added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
+        // +1 =  +2 for evicted-from edge from Feed Instance vertex to cluster.
+        // -1 imp-click-join1 is added twice instead of imp-click-join2 so there is one less edge as there is no
+        // classified-as -> Secure edge.
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
+    }
+
+    @Test
+    public void testLineageForRetentionWithNoFeedsEvicted() throws Exception {
+        cleanUp();
+        service.init();
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+                        EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        // No new vertices added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount);
+        // No new edges added
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount);
+    }
+
+    @Test (dependsOnMethods = "testOnAdd")
+    public void testOnChange() throws Exception {
+        // shutdown the graph and resurrect for testing
+        service.destroy();
+        service.init();
+
+        // cannot modify cluster, adding a new cluster
+        anotherCluster = addClusterEntity("another-cluster", "east-coast",
+                "classification=another");
+        verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY);
+
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 20); // +3 = cluster, colo, tag
+        // +2 edges to above, no user but only to colo and new tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 33);
+    }
+
+    @Test(dependsOnMethods = "testOnChange")
+    public void testOnFeedEntityChange() throws Exception {
+        Feed oldFeed = inputFeeds.get(0);
+        Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity,
+                "classified-as=Secured,source=data-warehouse", "reporting");
+        addStorage(newFeed, Storage.TYPE.FILESYSTEM,
+                "jail://global:00/falcon/impression-feed/20140101");
+
+        try {
+            configStore.initiateUpdate(newFeed);
+
+            // add cluster
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                    new org.apache.falcon.entity.v0.feed.Cluster();
+            feedCluster.setName(anotherCluster.getName());
+            newFeed.getClusters().getClusters().add(feedCluster);
+
+            configStore.update(EntityType.FEED, newFeed);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+
+        verifyUpdatedEdges(newFeed);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 22); //+2 = 2 new tags
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag
+    }
+
+    @Test
+    public void testLineageForTransactionFailure() throws Exception {
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+        verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
+        verifyClusterEntityEdges();
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag
+
+        Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null);
+        inputFeeds.add(feed);
+        outputFeeds.add(feed);
+
+        try {
+            processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                    "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                    WORKFLOW_VERSION, inputFeeds, outputFeeds);
+            Assert.fail();
+        } catch (FalconException e) {
+            Assert.assertEquals(getVerticesCount(service.getGraph()), 3);
+            Assert.assertEquals(getEdgesCount(service.getGraph()), 2);
+        }
+
+    }
+
+    private void verifyUpdatedEdges(Feed newFeed) {
+        Vertex feedVertex = getEntityVertex(newFeed.getName(), RelationshipType.FEED_ENTITY);
+
+        // groups
+        Edge edge = feedVertex.getEdges(Direction.OUT, RelationshipLabel.GROUPS.getName()).iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "reporting");
+
+        // tags
+        edge = feedVertex.getEdges(Direction.OUT, "classified-as").iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "Secured");
+        edge = feedVertex.getEdges(Direction.OUT, "source").iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse");
+
+        // new cluster
+        List<String> actual = new ArrayList<String>();
+        for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) {
+            actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name"));
+        }
+        Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "another-cluster")),
+                "Actual does not contain expected: " + actual);
+    }
+
+    @Test(dependsOnMethods = "testOnFeedEntityChange")
+    public void testOnProcessEntityChange() throws Exception {
+        Process oldProcess = processEntity;
+        Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster,
+                null, null);
+        EntityBuilderTestUtil.addProcessWorkflow(newProcess, GENERATE_WORKFLOW_NAME, "2.0.0");
+        EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
+
+        try {
+            configStore.initiateUpdate(newProcess);
+            configStore.update(EntityType.PROCESS, newProcess);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+
+        verifyUpdatedEdges(newProcess);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 22); // +0, no net new
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 29); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
+    }
+
+    @Test(dependsOnMethods = "testOnProcessEntityChange")
+    public void testAreSame() throws Exception {
+
+        Inputs inputs1 = new Inputs();
+        Inputs inputs2 = new Inputs();
+        Outputs outputs1 = new Outputs();
+        Outputs outputs2 = new Outputs();
+        // return true when both are null
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+
+        Input i1 = new Input();
+        i1.setName("input1");
+        Input i2 = new Input();
+        i2.setName("input2");
+        Output o1 = new Output();
+        o1.setName("output1");
+        Output o2 = new Output();
+        o2.setName("output2");
+
+        inputs1.getInputs().add(i1);
+        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+        outputs1.getOutputs().add(o1);
+        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+
+        inputs2.getInputs().add(i1);
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+        outputs2.getOutputs().add(o1);
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+    }
+
+    private void verifyUpdatedEdges(Process newProcess) {
+        Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
+
+        // cluster
+        Edge edge = processVertex.getEdges(Direction.OUT,
+                RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), anotherCluster.getName());
+
+        // inputs
+        edge = processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName()).iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.OUT).getProperty("name"),
+                newProcess.getInputs().getInputs().get(0).getFeed());
+
+        // outputs
+        for (Edge e : processVertex.getEdges(Direction.OUT, RelationshipLabel.PROCESS_FEED_EDGE.getName())) {
+            Assert.fail("there should not be any edges to output feeds" + e);
+        }
+    }
+
+    public static void debug(final Graph graph) {
+        System.out.println("*****Vertices of " + graph);
+        for (Vertex vertex : graph.getVertices()) {
+            System.out.println(GraphUtils.vertexString(vertex));
+        }
+
+        System.out.println("*****Edges of " + graph);
+        for (Edge edge : graph.getEdges()) {
+            System.out.println(GraphUtils.edgeString(edge));
+        }
+    }
+
+    private Cluster addClusterEntity(String name, String colo, String tags) throws Exception {
+        Cluster cluster = EntityBuilderTestUtil.buildCluster(name, colo, tags);
+        configStore.publish(EntityType.CLUSTER, cluster);
+        return cluster;
+    }
+
+    private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups,
+                               Storage.TYPE storageType, String uriTemplate) throws Exception {
+        return addFeedEntity(feedName, new Cluster[]{cluster}, tags, groups, storageType, uriTemplate);
+    }
+
+    private Feed addFeedEntity(String feedName, Cluster[] clusters, String tags, String groups,
+                               Storage.TYPE storageType, String uriTemplate) throws Exception {
+        Feed feed = EntityBuilderTestUtil.buildFeed(feedName, clusters,
+                tags, groups);
+        addStorage(feed, storageType, uriTemplate);
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
+            if (feedCluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) {
+                feedCluster.setType(ClusterType.TARGET);
+            }
+        }
+        configStore.publish(EntityType.FEED, feed);
+        return feed;
+    }
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public Process addProcessEntity(String processName, Cluster cluster,
+                                    String tags, String pipelineTags, String workflowName,
+                                    String version, List<Feed> inFeeds, List<Feed> outFeeds) throws Exception {
+        Process process = EntityBuilderTestUtil.buildProcess(processName, cluster,
+                tags, pipelineTags);
+        EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version);
+
+        for (Feed inputFeed : inFeeds) {
+            EntityBuilderTestUtil.addInput(process, inputFeed);
+        }
+
+        for (Feed outputFeed : outFeeds) {
+            EntityBuilderTestUtil.addOutput(process, outputFeed);
+        }
+
+        configStore.publish(EntityType.PROCESS, process);
+        return process;
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            feed.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            feed.setTable(table);
+        }
+    }
+
+    private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed,
+                                   Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            cluster.setLocations(new Locations());
+            cluster.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            cluster.setTable(table);
+        }
+    }
+
+    private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) {
+        Vertex entityVertex = getEntityVertex(entityName, entityType);
+        Assert.assertNotNull(entityVertex);
+        verifyEntityProperties(entityVertex, entityName, entityType);
+    }
+
+    private void verifyEntityProperties(Vertex entityVertex, String entityName, RelationshipType entityType) {
+        Assert.assertEquals(entityName, entityVertex.getProperty(RelationshipProperty.NAME.getName()));
+        Assert.assertEquals(entityType.getName(), entityVertex.getProperty(RelationshipProperty.TYPE.getName()));
+        Assert.assertNotNull(entityVertex.getProperty(RelationshipProperty.TIMESTAMP.getName()));
+    }
+
+    private void verifyClusterEntityEdges() {
+        Vertex clusterVertex = getEntityVertex(CLUSTER_ENTITY_NAME,
+                RelationshipType.CLUSTER_ENTITY);
+
+        // verify edge to user vertex
+        verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.USER.getName(),
+                FALCON_USER, RelationshipType.USER.getName());
+        // verify edge to colo vertex
+        verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(),
+                COLO_NAME, RelationshipType.COLO.getName());
+        // verify edge to tags vertex
+        verifyVertexForEdge(clusterVertex, Direction.OUT, "classification",
+                "production", RelationshipType.TAGS.getName());
+    }
+
+    private void verifyFeedEntityEdges(String feedName, String tag, String group) {
+        Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY);
+
+        // verify edge to cluster vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(),
+                CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName());
+        // verify edge to user vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(),
+                FALCON_USER, RelationshipType.USER.getName());
+
+        // verify edge to tags vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as",
+                tag, RelationshipType.TAGS.getName());
+        // verify edge to group vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(),
+                group, RelationshipType.GROUPS.getName());
+    }
+
+    private void verifyProcessEntityEdges() {
+        Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME,
+                RelationshipType.PROCESS_ENTITY);
+
+        // verify edge to cluster vertex
+        verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(),
+                CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName());
+        // verify edge to user vertex
+        verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.USER.getName(),
+                FALCON_USER, RelationshipType.USER.getName());
+        // verify edge to tags vertex
+        verifyVertexForEdge(processVertex, Direction.OUT, "classified-as",
+                "Critical", RelationshipType.TAGS.getName());
+
+        // verify edges to inputs
+        List<String> actual = new ArrayList<String>();
+        for (Edge edge : processVertex.getEdges(Direction.IN,
+                RelationshipLabel.FEED_PROCESS_EDGE.getName())) {
+            Vertex outVertex = edge.getVertex(Direction.OUT);
+            Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(),
+                    outVertex.getProperty(RelationshipProperty.TYPE.getName()));
+            actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName()));
+        }
+
+        Assert.assertTrue(actual.containsAll(Arrays.asList("impression-feed", "clicks-feed")),
+                "Actual does not contain expected: " + actual);
+
+        actual.clear();
+        // verify edges to outputs
+        for (Edge edge : processVertex.getEdges(Direction.OUT,
+                RelationshipLabel.PROCESS_FEED_EDGE.getName())) {
+            Vertex outVertex = edge.getVertex(Direction.IN);
+            Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(),
+                    outVertex.getProperty(RelationshipProperty.TYPE.getName()));
+            actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName()));
+        }
+        Assert.assertTrue(actual.containsAll(Arrays.asList("imp-click-join1", "imp-click-join2")),
+                "Actual does not contain expected: " + actual);
+    }
+
+    private Vertex getEntityVertex(String entityName, RelationshipType entityType) {
+        GraphQuery entityQuery = getQuery()
+                .has(RelationshipProperty.NAME.getName(), entityName)
+                .has(RelationshipProperty.TYPE.getName(), entityType.getName());
+        Iterator<Vertex> iterator = entityQuery.vertices().iterator();
+        Assert.assertTrue(iterator.hasNext());
+
+        Vertex entityVertex = iterator.next();
+        Assert.assertNotNull(entityVertex);
+
+        return entityVertex;
+    }
+
+    private void verifyVertexForEdge(Vertex fromVertex, Direction direction, String label,
+                                     String expectedName, String expectedType) {
+        for (Edge edge : fromVertex.getEdges(direction, label)) {
+            Vertex outVertex = edge.getVertex(Direction.IN);
+            Assert.assertEquals(
+                    outVertex.getProperty(RelationshipProperty.NAME.getName()), expectedName);
+            Assert.assertEquals(
+                    outVertex.getProperty(RelationshipProperty.TYPE.getName()), expectedType);
+        }
+    }
+
+    private void verifyEntityGraph(RelationshipType feedType, String classification) {
+        // feeds owned by a user
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName());
+        Assert.assertEquals(feedNamesOwnedByUser,
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1",
+                        "imp-click-join2")
+        );
+
+        // feeds classified as secure
+        verifyFeedsClassifiedAsSecure(feedType.getName(),
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
+
+        // feeds owned by a user and classified as secure
+        verifyFeedsOwnedByUserAndClassification(feedType.getName(), classification,
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
+    }
+
+    private List<String> getFeedsOwnedByAUser(String feedType) {
+        GraphQuery userQuery = getQuery()
+                .has(RelationshipProperty.NAME.getName(), FALCON_USER)
+                .has(RelationshipProperty.TYPE.getName(), RelationshipType.USER.getName());
+
+        List<String> feedNames = new ArrayList<String>();
+        for (Vertex userVertex : userQuery.vertices()) {
+            for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) {
+                if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
+                    System.out.println(FALCON_USER + " owns -> " + GraphUtils.vertexString(feed));
+                    feedNames.add(feed.<String>getProperty(RelationshipProperty.NAME.getName()));
+                }
+            }
+        }
+
+        return feedNames;
+    }
+
+    private void verifyFeedsClassifiedAsSecure(String feedType, List<String> expected) {
+        GraphQuery classQuery = getQuery()
+                .has(RelationshipProperty.NAME.getName(), "Secure")
+                .has(RelationshipProperty.TYPE.getName(), RelationshipType.TAGS.getName());
+
+        List<String> actual = new ArrayList<String>();
+        for (Vertex feedVertex : classQuery.vertices()) {
+            for (Vertex feed : feedVertex.getVertices(Direction.BOTH, "classified-as")) {
+                if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
+                    System.out.println(" Secure classification -> " + GraphUtils.vertexString(feed));
+                    actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName()));
+                }
+            }
+        }
+
+        Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
+    }
+
+    private void verifyFeedsOwnedByUserAndClassification(String feedType, String classification,
+                                                         List<String> expected) {
+        List<String> actual = new ArrayList<String>();
+        Vertex userVertex = getEntityVertex(FALCON_USER, RelationshipType.USER);
+        for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) {
+            if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
+                for (Vertex classVertex : feed.getVertices(Direction.OUT, "classified-as")) {
+                    if (classVertex.getProperty(RelationshipProperty.NAME.getName())
+                            .equals(classification)) {
+                        actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName()));
+                        System.out.println(classification + " feed owned by falcon-user -> "
+                                + GraphUtils.vertexString(feed));
+                    }
+                }
+            }
+        }
+        Assert.assertTrue(actual.containsAll(expected),
+                "Actual does not contain expected: " + actual);
+    }
+
+    public long getVerticesCount(final Graph graph) {
+        long count = 0;
+        for (Vertex ignored : graph.getVertices()) {
+            count++;
+        }
+
+        return count;
+    }
+
+    public long getEdgesCount(final Graph graph) {
+        long count = 0;
+        for (Edge ignored : graph.getEdges()) {
+            count++;
+        }
+
+        return count;
+    }
+
+    private void verifyLineageGraph(String feedType) {
+        List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+        List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
+                "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+        List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+        verifyLineageGraph(feedType, expectedFeeds, secureFeeds, ownedAndSecureFeeds);
+    }
+
+    private void verifyLineageGraph(String feedType, List<String> expectedFeeds,
+                                    List<String> secureFeeds, List<String> ownedAndSecureFeeds) {
+        // feeds owned by a user
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType);
+        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expectedFeeds));
+
+        Graph graph = service.getGraph();
+
+        Iterator<Vertex> vertices = graph.getVertices("name", "impression-feed/2014-01-01T00:00Z").iterator();
+        Assert.assertTrue(vertices.hasNext());
+        Vertex feedInstanceVertex = vertices.next();
+        Assert.assertEquals(feedInstanceVertex.getProperty(RelationshipProperty.TYPE.getName()),
+                RelationshipType.FEED_INSTANCE.getName());
+
+        Object vertexId = feedInstanceVertex.getId();
+        Vertex vertexById = graph.getVertex(vertexId);
+        Assert.assertEquals(vertexById, feedInstanceVertex);
+
+        // feeds classified as secure
+        verifyFeedsClassifiedAsSecure(feedType, secureFeeds);
+
+        // feeds owned by a user and classified as secure
+        verifyFeedsOwnedByUserAndClassification(feedType, "Financial", ownedAndSecureFeeds);
+    }
+
+    private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath,
+                                                            WorkflowExecutionContext context,
+                                                            RelationshipLabel edgeLabel) throws Exception {
+        String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
+                , context.getClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
+        Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+        Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
+                .iterator().next();
+        Assert.assertNotNull(edge);
+        Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName())
+                , context.getTimeStampAsISO8601());
+
+        Vertex clusterVertex = edge.getVertex(Direction.IN);
+        Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName());
+    }
+
+    private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames,
+                                               String feedInstancePaths, String falconInputPaths,
+                                               String falconInputFeeds) {
+        String cluster;
+        if (EntityOperations.REPLICATE == operation) {
+            cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
+        } else {
+            cluster = CLUSTER_ENTITY_NAME;
+        }
+
+        return new String[]{
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), operation.toString(),
+
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
+            (falconInputFeeds != null ? falconInputFeeds : INPUT_FEED_NAMES),
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
+            (falconInputPaths != null ? falconInputPaths : INPUT_INSTANCE_PATHS),
+
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
+            (outputFeedNames != null ? outputFeedNames : OUTPUT_FEED_NAMES),
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
+            (feedInstancePaths != null ? feedInstancePaths : OUTPUT_INSTANCE_PATHS),
+
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+
+            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), wfName,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+        };
+    }
+
+    private void setup() throws Exception {
+        cleanUp();
+        service.init();
+
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+
+        addFeedsAndProcess(clusterEntity);
+    }
+
+    private void addFeedsAndProcess(Cluster cluster) throws Exception  {
+        // Add input and output feeds
+        Feed impressionsFeed = addFeedEntity("impression-feed", cluster,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+        List<Feed> inFeeds = new ArrayList<>();
+        List<Feed> outFeeds = new ArrayList<>();
+        inFeeds.add(impressionsFeed);
+        Feed clicksFeed = addFeedEntity("clicks-feed", cluster,
+                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
+        inFeeds.add(clicksFeed);
+        Feed join1Feed = addFeedEntity("imp-click-join1", cluster,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outFeeds.add(join1Feed);
+        Feed join2Feed = addFeedEntity("imp-click-join2", cluster,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        outFeeds.add(join2Feed);
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION, inFeeds, outFeeds);
+    }
+
+    private void setupForLineageReplication() throws Exception {
+        cleanUp();
+        service.init();
+
+        List<Feed> inFeeds = new ArrayList<>();
+        List<Feed> outFeeds = new ArrayList<>();
+
+        addClusterAndFeedForReplication(inFeeds);
+
+        // Add output feed
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outFeeds.add(join1Feed);
+
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION, inFeeds, outFeeds);
+
+        // GENERATE WF should have run before this to create all instance related vertices
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+                "jail://global:00/falcon/imp-click-join1/20140101",
+                "jail://global:00/falcon/raw-click/primary/20140101",
+                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+    }
+
+    private void addClusterAndFeedForReplication(List<Feed> inFeeds) throws Exception {
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+        // Add backup cluster
+        Cluster bcpCluster = addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
+
+        Cluster[] clusters = {clusterEntity, bcpCluster};
+
+        // Add feed
+        Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
+        // Add uri template for each cluster
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : rawFeed.getClusters().getClusters()) {
+            if (feedCluster.getName().equals(CLUSTER_ENTITY_NAME)) {
+                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+                        "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}");
+            } else {
+                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+                        "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}");
+            }
+        }
+
+        // update config store
+        try {
+            configStore.initiateUpdate(rawFeed);
+            configStore.update(EntityType.FEED, rawFeed);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+        inFeeds.add(rawFeed);
+    }
+
+    private void setupForLineageEviction() throws Exception {
+        setup();
+
+        // GENERATE WF should have run before this to create all instance related vertices
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME,
+                        "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+    }
+
+    private void setupForNoDateInFeedPath() throws Exception {
+        cleanUp();
+        service.init();
+
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+        List<Feed> inFeeds = new ArrayList<>();
+        List<Feed> outFeeds = new ArrayList<>();
+        // Add input and output feeds
+        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/impression-feed");
+        inFeeds.add(impressionsFeed);
+        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/clicks-feed");
+        inFeeds.add(clicksFeed);
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1");
+        outFeeds.add(join1Feed);
+        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join2");
+        outFeeds.add(join2Feed);
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION, inFeeds, outFeeds);
+
+    }
+
+    private void cleanUp() throws Exception {
+        cleanupGraphStore(service.getGraph());
+        cleanupConfigurationStore(configStore);
+        service.destroy();
+    }
+
+    private void cleanupGraphStore(Graph graph) {
+        for (Edge edge : graph.getEdges()) {
+            graph.removeEdge(edge);
+        }
+
+        for (Vertex vertex : graph.getVertices()) {
+            graph.removeVertex(vertex);
+        }
+
+        graph.shutdown();
+    }
+
+    private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception {
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
index 7095785..14f6e73 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
@@ -211,7 +211,7 @@ public class MetadataDiscoveryResourceTest {
         Assert.assertEquals(inVertices.size(), 10);
         Assert.assertNotNull(((Map) inVertices.get(0)).get("name"));
         List outVertices = (List) results.get("outVertices");
-        Assert.assertEquals(outVertices.size(), 2);
+        Assert.assertEquals(outVertices.size(), 3);
         Assert.assertNotNull(((Map) outVertices.get(0)).get("name"));
     }
 
@@ -251,7 +251,7 @@ public class MetadataDiscoveryResourceTest {
         Assert.assertEquals(results.get("type"), RelationshipType.USER.toString());
         Assert.assertEquals(results.get("name"), "falcon-user");
         List inVertices = (List) results.get("inVertices");
-        Assert.assertEquals(inVertices.size(), 10);
+        Assert.assertEquals(inVertices.size(), 11);
         Assert.assertNotNull(((Map) inVertices.get(0)).get("name"));
         List outVertices = (List) results.get("outVertices");
         Assert.assertEquals(outVertices.size(), 0);


Mime
View raw message