Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2DE2E116F5 for ; Thu, 28 Aug 2014 22:55:26 +0000 (UTC) Received: (qmail 30125 invoked by uid 500); 28 Aug 2014 22:55:25 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 30091 invoked by uid 500); 28 Aug 2014 22:55:25 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 30082 invoked by uid 99); 28 Aug 2014 22:55:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 22:55:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 28 Aug 2014 22:54:58 +0000 Received: (qmail 21698 invoked by uid 99); 28 Aug 2014 22:54:55 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 22:54:55 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 334BF82CD83; Thu, 28 Aug 2014 22:54:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venkatesh@apache.org To: commits@falcon.incubator.apache.org Message-Id: <9335a2167ba34714a0ddb99daf7a72a2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FALCON-325 Process lineage information for Replication policies. Contributed by Sowmya Ramesh Date: Thu, 28 Aug 2014 22:54:55 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-falcon Updated Branches: refs/heads/master 305feb0b4 -> 23eed9f6e FALCON-325 Process lineage information for Replication policies. Contributed by Sowmya Ramesh Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/23eed9f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/23eed9f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/23eed9f6 Branch: refs/heads/master Commit: 23eed9f6e43c0b5b028e14130ab16afd5ac5179c Parents: 305feb0 Author: Venkatesh Seetharam Authored: Thu Aug 28 15:53:58 2014 -0700 Committer: Venkatesh Seetharam Committed: Thu Aug 28 15:54:45 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../InstanceRelationshipGraphBuilder.java | 44 +++- .../falcon/metadata/MetadataMappingService.java | 4 +- .../metadata/RelationshipGraphBuilder.java | 13 +- .../falcon/metadata/RelationshipLabel.java | 5 +- .../workflow/WorkflowExecutionContext.java | 23 +- .../metadata/MetadataMappingServiceTest.java | 258 ++++++++++++++----- .../feed/FeedReplicationCoordinatorBuilder.java | 4 + .../feed/OozieFeedWorkflowBuilderTest.java | 6 +- .../falcon/oozie/process/AbstractTestBase.java | 13 +- 10 files changed, 290 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a358be4..075fe7e 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,9 @@ Trunk (Unreleased) FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS) IMPROVEMENTS + FALCON-325 Process lineage information for Replication policies + (Sowmya Ramesh via Venkatesh Seetharam) + FALCON-474 Add Bulk APIs to drive the dashboard needs (Balu Vellanki via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java index 735f87a..452872e 100644 --- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java @@ -114,6 +114,12 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { public void addInstanceToEntity(Vertex instanceVertex, String entityName, RelationshipType entityType, RelationshipLabel edgeLabel) { + addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null); + } + + public void addInstanceToEntity(Vertex instanceVertex, String entityName, + RelationshipType entityType, RelationshipLabel edgeLabel, + String timestamp) { Vertex entityVertex = findVertex(entityName, entityType); LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex); if (entityVertex == null) { @@ -122,7 +128,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { return; } - addEdge(instanceVertex, entityVertex, edgeLabel.getName()); + addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp); } public void addOutputFeedInstances(WorkflowExecutionContext context, @@ -166,6 +172,36 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { } } + public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException { + String outputFeedNamesArg = context.getOutputFeedNames(); + if ("NONE".equals(outputFeedNamesArg)) { + return; // there are no output feeds + } + + String[] outputFeedNames = context.getOutputFeedNamesList(); + String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList(); + String targetClusterName = context.getClusterName(); + String srcClusterName = context.getSrcClusterName(); + + // For replication there will be only one output feed name + String feedName = outputFeedNames[0]; + String feedInstanceDataPath = outputFeedInstancePaths[0]; + + LOG.info("Computing feed instance for : name=" + feedName + ", path= " + + feedInstanceDataPath + ", in cluster: " + srcClusterName); + RelationshipType vertexType = RelationshipType.FEED_INSTANCE; + String feedInstanceName = getFeedInstanceName(feedName, srcClusterName, feedInstanceDataPath); + Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType); + + LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex); + if (feedInstanceVertex == null) { + throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName); + } + + addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY, + RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601()); + } + private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel, WorkflowExecutionContext context, String feedName, String feedInstanceDataPath) throws FalconException { @@ -193,7 +229,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { } } - public String getFeedInstanceName(String feedName, String clusterName, + public static String getFeedInstanceName(String feedName, String clusterName, String feedInstancePath) throws FalconException { try { Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName); @@ -209,14 +245,14 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { } } - private String getTableFeedInstanceName(Feed feed, String feedInstancePath, + private static String getTableFeedInstanceName(Feed feed, String feedInstancePath, Storage.TYPE storageType) throws URISyntaxException { CatalogStorage instanceStorage = (CatalogStorage) FeedHelper.createStorage( storageType.name(), feedInstancePath); return feed.getName() + "/" + instanceStorage.toPartitionAsPath(); } - private String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed, + private static String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed, Cluster cluster) throws FalconException { Storage rawStorage = FeedHelper.createStorage(cluster, feed); String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java index a501e69..ab82ce1 100644 --- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java +++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java @@ -288,9 +288,9 @@ public class MetadataMappingService instanceGraphBuilder.addInputFeedInstances(context, processInstance); } - private void onFeedInstanceReplicated(WorkflowExecutionContext context) { + private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException { LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601()); - // todo - tbd + instanceGraphBuilder.addReplicatedInstance(context); } private void onFeedInstanceEvicted(WorkflowExecutionContext context) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java index 898d914..d5685a5 100644 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java @@ -109,8 +109,19 @@ public abstract class RelationshipGraphBuilder { } protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) { + return addEdge(fromVertex, toVertex, edgeLabel, null); + } + + protected Edge addEdge(Vertex fromVertex, Vertex toVertex, + String edgeLabel, String timestamp) { Edge edge = findEdge(fromVertex, toVertex, edgeLabel); - return edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex); + + Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex); + if (timestamp != null) { + edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp); + } + + return edgeToVertex; } protected void removeEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java index 969640a..acd764f 100644 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java +++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java @@ -36,7 +36,10 @@ public enum RelationshipLabel { CLUSTER_COLO("collocated"), USER("owned-by"), GROUPS("grouped-as"), - PIPELINES("part-of-pipeline"); + PIPELINES("pipeline"), + + // replication labels + FEED_CLUSTER_REPLICATED_EDGE("replicated-to"); private final String name; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index f5bb782..c074484 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -54,7 +54,7 @@ public class WorkflowExecutionContext { public static final String OUTPUT_FEED_SEPARATOR = ","; public static final String INPUT_FEED_SEPARATOR = "#"; - + public static final String CLUSTER_NAME_SEPARATOR = ","; /** * Workflow execution status. @@ -161,7 +161,26 @@ public class WorkflowExecutionContext { } public String getClusterName() { - return getValue(WorkflowExecutionArgs.CLUSTER_NAME); + String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME); + if (EntityOperations.REPLICATE != getOperation()) { + return value; + } + + return value.split(CLUSTER_NAME_SEPARATOR)[0]; + } + + public String getSrcClusterName() { + String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME); + if (EntityOperations.REPLICATE != getOperation()) { + return value; + } + + String[] parts = value.split(CLUSTER_NAME_SEPARATOR); + if (parts.length != 2) { + throw new IllegalArgumentException("Replicated cluster pair is missing in " + value); + } + + return parts[1]; } public String getEntityName() { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java index 2b030fd..3f3f539 100644 --- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java +++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java @@ -44,6 +44,7 @@ import org.apache.falcon.service.Services; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; +import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations; import org.apache.falcon.workflow.WorkflowJobEndNotificationService; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -66,12 +67,13 @@ public class MetadataMappingServiceTest { public static final String FALCON_USER = "falcon-user"; private static final String LOGS_DIR = "target/log"; private static final String NOMINAL_TIME = "2014-01-01-01-00"; - public static final String OPERATION = "GENERATE"; public static final String CLUSTER_ENTITY_NAME = "primary-cluster"; + public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster"; public static final String PROCESS_ENTITY_NAME = "sample-process"; public static final String COLO_NAME = "west-coast"; - public static final String WORKFLOW_NAME = "imp-click-join-workflow"; + public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow"; + public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow"; public static final String WORKFLOW_VERSION = "1.0.9"; public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed"; @@ -82,6 +84,8 @@ public class MetadataMappingServiceTest { public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2"; public static final String OUTPUT_INSTANCE_PATHS = "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101"; + private static final String REPLICATED_OUTPUT_INSTANCE_PATHS = + "jail://global:00/falcon/imp-click-join1/20140101"; public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory"; @@ -89,7 +93,7 @@ public class MetadataMappingServiceTest { private MetadataMappingService service; private Cluster clusterEntity; - private Cluster bcpCluster; + private Cluster anotherCluster; private List inputFeeds = new ArrayList(); private List outputFeeds = new ArrayList(); private Process processEntity; @@ -117,9 +121,7 @@ public class MetadataMappingServiceTest { public void tearDown() throws Exception { GraphUtils.dump(service.getGraph(), System.out); - cleanupGraphStore(service.getGraph()); - cleanupConfigurationStore(configStore); - service.destroy(); + cleanUp(); StartupProperties.get().setProperty("falcon.graph.preserve.history", "false"); } @@ -139,9 +141,8 @@ public class MetadataMappingServiceTest { @Test public void testOnAddClusterEntity() throws Exception { - clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME, + clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production"); - configStore.publish(EntityType.CLUSTER, clusterEntity); verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); verifyClusterEntityEdges(); @@ -152,39 +153,35 @@ public class MetadataMappingServiceTest { @Test (dependsOnMethods = "testOnAddClusterEntity") public void testOnAddFeedEntity() throws Exception { - Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity, - "classified-as=Secure", "analytics"); - addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); - configStore.publish(EntityType.FEED, impressionsFeed); + Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, + "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, + "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); inputFeeds.add(impressionsFeed); verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY); verifyFeedEntityEdges(impressionsFeed.getName()); Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user - Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity, - "classified-as=Secure,classified-as=Financial", "analytics"); - addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); - configStore.publish(EntityType.FEED, clicksFeed); + Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, + "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, + "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); inputFeeds.add(clicksFeed); verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY); Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag - Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity, - "classified-as=Financial", "reporting,bi"); - addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - configStore.publish(EntityType.FEED, join1Feed); + Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, + "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, + "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); outputFeeds.add(join1Feed); verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY); Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user + // Group + 2Tags - Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity, - "classified-as=Secure,classified-as=Financial", "reporting,bi"); - addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); - configStore.publish(EntityType.FEED, join2Feed); + Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, + "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, + "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); outputFeeds.add(join2Feed); verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY); @@ -195,19 +192,9 @@ public class MetadataMappingServiceTest { @Test (dependsOnMethods = "testOnAddFeedEntity") public void testOnAddProcessEntity() throws Exception { - processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline"); - EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION); - - for (Feed inputFeed : inputFeeds) { - EntityBuilderTestUtil.addInput(processEntity, inputFeed); - } - - for (Feed outputFeed : outputFeeds) { - EntityBuilderTestUtil.addOutput(processEntity, outputFeed); - } - - configStore.publish(EntityType.PROCESS, processEntity); + processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, + "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, + WORKFLOW_VERSION); verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY); verifyProcessEntityEdges(); @@ -223,14 +210,13 @@ public class MetadataMappingServiceTest { verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure"); } - @Test(dependsOnMethods = "testOnAdd") + @Test public void testMapLineage() throws Exception { - // shutdown the graph and resurrect for testing - service.destroy(); - service.init(); + setup(); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(), - WorkflowExecutionContext.Type.POST_PROCESSING); + WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( + EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null) + , WorkflowExecutionContext.Type.POST_PROCESSING); service.onSuccess(context); debug(service.getGraph()); @@ -243,21 +229,44 @@ public class MetadataMappingServiceTest { Assert.assertEquals(getEdgesCount(service.getGraph()), 71); } - @Test (dependsOnMethods = "testMapLineage") + @Test + public void testLineageForReplication() throws Exception { + setupForLineageReplication(); + + String feedName = "imp-click-join1"; + WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( + EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, feedName, + REPLICATED_OUTPUT_INSTANCE_PATHS, null, null), WorkflowExecutionContext.Type.POST_PROCESSING); + service.onSuccess(context); + + debug(service.getGraph()); + GraphUtils.dump(service.getGraph()); + verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName()); + + verifyLineageGraphForReplicationOrEviction(feedName, REPLICATED_OUTPUT_INSTANCE_PATHS, context, + RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE); + + // +3 = cluster, colo, tag + Assert.assertEquals(getVerticesCount(service.getGraph()), 26); + // +3 = +2 edges for bcp cluster, no user but only to colo and new tag + 1 for replicated-to edge to target + // cluster for each output feed instance + Assert.assertEquals(getEdgesCount(service.getGraph()), 74); + } + + @Test (dependsOnMethods = "testOnAdd") public void testOnChange() throws Exception { // shutdown the graph and resurrect for testing service.destroy(); service.init(); // cannot modify cluster, adding a new cluster - bcpCluster = EntityBuilderTestUtil.buildCluster("bcp-cluster", "east-coast", - "classification=bcp"); - configStore.publish(EntityType.CLUSTER, bcpCluster); - verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY); + anotherCluster = addClusterEntity("another-cluster", "east-coast", + "classification=another"); + verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), 26); // +3 = cluster, colo, tag, 2 pipelines - // +4 edges to above, no user but only to colo, new tag, and 2 new pipelines - Assert.assertEquals(getEdgesCount(service.getGraph()), 73); + Assert.assertEquals(getVerticesCount(service.getGraph()), 20); // +3 = cluster, colo, tag + // +2 edges to above, no user but only to colo and new tag + Assert.assertEquals(getEdgesCount(service.getGraph()), 33); } @Test(dependsOnMethods = "testOnChange") @@ -274,7 +283,7 @@ public class MetadataMappingServiceTest { // add cluster org.apache.falcon.entity.v0.feed.Cluster feedCluster = new org.apache.falcon.entity.v0.feed.Cluster(); - feedCluster.setName(bcpCluster.getName()); + feedCluster.setName(anotherCluster.getName()); newFeed.getClusters().getClusters().add(feedCluster); configStore.update(EntityType.FEED, newFeed); @@ -283,8 +292,8 @@ public class MetadataMappingServiceTest { } verifyUpdatedEdges(newFeed); - Assert.assertEquals(getVerticesCount(service.getGraph()), 28); //+2 = 2 new tags - Assert.assertEquals(getEdgesCount(service.getGraph()), 75); // +2 = 1 new cluster, 1 new tag + Assert.assertEquals(getVerticesCount(service.getGraph()), 22); //+2 = 2 new tags + Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag } private void verifyUpdatedEdges(Feed newFeed) { @@ -305,16 +314,16 @@ public class MetadataMappingServiceTest { for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) { actual.add(clusterEdge.getVertex(Direction.IN).getProperty("name")); } - Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "bcp-cluster")), + Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "another-cluster")), "Actual does not contain expected: " + actual); } @Test(dependsOnMethods = "testOnFeedEntityChange") public void testOnProcessEntityChange() throws Exception { Process oldProcess = processEntity; - Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), bcpCluster, + Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster, null, null); - EntityBuilderTestUtil.addProcessWorkflow(newProcess, WORKFLOW_NAME, "2.0.0"); + EntityBuilderTestUtil.addProcessWorkflow(newProcess, GENERATE_WORKFLOW_NAME, "2.0.0"); EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0)); try { @@ -325,8 +334,8 @@ public class MetadataMappingServiceTest { } verifyUpdatedEdges(newProcess); - Assert.assertEquals(getVerticesCount(service.getGraph()), 28); // +0, no net new - Assert.assertEquals(getEdgesCount(service.getGraph()), 69); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines + Assert.assertEquals(getVerticesCount(service.getGraph()), 22); // +0, no net new + Assert.assertEquals(getEdgesCount(service.getGraph()), 29); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines } @Test(dependsOnMethods = "testOnProcessEntityChange") @@ -366,7 +375,7 @@ public class MetadataMappingServiceTest { // cluster Edge edge = processVertex.getEdges(Direction.OUT, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), bcpCluster.getName()); + Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), anotherCluster.getName()); // inputs edge = processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName()).iterator().next(); @@ -391,6 +400,40 @@ public class MetadataMappingServiceTest { } } + private Cluster addClusterEntity(String name, String colo, String tags) throws Exception { + Cluster cluster = EntityBuilderTestUtil.buildCluster(name, colo, tags); + configStore.publish(EntityType.CLUSTER, cluster); + return cluster; + } + + private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups, + Storage.TYPE storageType, String uriTemplate) throws Exception { + Feed feed = EntityBuilderTestUtil.buildFeed(feedName, cluster, + tags, groups); + addStorage(feed, storageType, uriTemplate); + configStore.publish(EntityType.FEED, feed); + return feed; + } + + public Process addProcessEntity(String processName, Cluster cluster, + String tags, String pipelineTags, String workflowName, + String version) throws Exception { + Process process = EntityBuilderTestUtil.buildProcess(processName, cluster, + tags, pipelineTags); + EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version); + + for (Feed inputFeed : inputFeeds) { + EntityBuilderTestUtil.addInput(process, inputFeed); + } + + for (Feed outputFeed : outputFeeds) { + EntityBuilderTestUtil.addOutput(process, outputFeed); + } + + configStore.publish(EntityType.PROCESS, process); + return process; + } + private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) { if (storageType == Storage.TYPE.FILESYSTEM) { Locations locations = new Locations(); @@ -633,19 +676,49 @@ public class MetadataMappingServiceTest { "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z")); } - private static String[] getTestMessageArgs() { + private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath, + WorkflowExecutionContext context, + RelationshipLabel edgeLabel) throws Exception { + String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName + , context.getSrcClusterName(), feedInstanceDataPath); + Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); + + Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName()) + .iterator().next(); + Assert.assertNotNull(edge); + Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName()) + , context.getTimeStampAsISO8601()); + + Vertex clusterVertex = edge.getVertex(Direction.IN); + Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName()); + } + + private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames, + String feedInstancePaths, String falconInputPaths, + String falconInputFeeds) { + String cluster; + if (EntityOperations.REPLICATE == operation) { + cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME; + } else { + cluster = CLUSTER_ENTITY_NAME; + } + return new String[]{ - "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME, + "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster, "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"), "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME, "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME, - "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION, + "-" + WorkflowExecutionArgs.OPERATION.getName(), operation.toString(), - "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES, - "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS, + "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), + (falconInputFeeds != null ? falconInputFeeds : INPUT_FEED_NAMES), + "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), + (falconInputPaths != null ? falconInputPaths : INPUT_INSTANCE_PATHS), - "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES, - "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS, + "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), + (outputFeedNames != null ? outputFeedNames : OUTPUT_FEED_NAMES), + "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), + (feedInstancePaths != null ? feedInstancePaths : OUTPUT_INSTANCE_PATHS), "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00", "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER, @@ -655,11 +728,10 @@ public class MetadataMappingServiceTest { "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie", "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id", - "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME, + "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), wfName, "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION, "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(), - "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER, "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true", "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER, @@ -671,6 +743,54 @@ public class MetadataMappingServiceTest { }; } + private void setup() throws Exception { + cleanUp(); + service.init(); + + // Add cluster + clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, + "classification=production"); + + // Add input and output feeds + Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, + "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, + "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); + inputFeeds.add(impressionsFeed); + Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, + "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, + "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); + inputFeeds.add(clicksFeed); + Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, + "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, + "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); + outputFeeds.add(join1Feed); + Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, + "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, + "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); + outputFeeds.add(join2Feed); + processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, + "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, + WORKFLOW_VERSION); + + } + + private void setupForLineageReplication() throws Exception { + setup(); + // GENERATE WF should have run before this to create all instance related vertices + WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( + EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null) + , WorkflowExecutionContext.Type.POST_PROCESSING); + service.onSuccess(context); + // Add backup cluster + addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp"); + } + + private void cleanUp() throws Exception { + cleanupGraphStore(service.getGraph()); + cleanupConfigurationStore(configStore); + service.destroy(); + } + private void cleanupGraphStore(Graph graph) { for (Edge edge : graph.getEdges()) { graph.removeEdge(edge); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index 966f90e..5697eb6 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -159,6 +159,10 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder props = getCoordProperties(coord); - verifyEntityProperties(feed, trgCluster, + verifyEntityProperties(feed, trgCluster, srcCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, props); verifyBrokerProperties(trgCluster, props); @@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("maxMaps"), "33"); Assert.assertEquals(props.get("mapBandwidthKB"), "2048"); - verifyEntityProperties(aFeed, aCluster, + verifyEntityProperties(aFeed, aCluster, srcCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, props); verifyBrokerProperties(trgCluster, props); } @@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), wfPath.toString()); - verifyEntityProperties(tableFeed, trgCluster, + verifyEntityProperties(tableFeed, trgCluster, srcCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, props); verifyBrokerProperties(trgCluster, props); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23eed9f6/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java index b547c31..4e260e9 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java @@ -217,11 +217,22 @@ public class AbstractTestBase { protected void verifyEntityProperties(Entity entity, Cluster cluster, WorkflowExecutionContext.EntityOperations operation, HashMap props) throws Exception { + verifyEntityProperties(entity, cluster, null, operation, props); + } + + protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster, + WorkflowExecutionContext.EntityOperations operation, + HashMap props) throws Exception { Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()), entity.getName()); Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()), entity.getEntityType().name()); - Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName()); + if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) { + Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), + cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName()); + } else { + Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName()); + } Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity)); Assert.assertEquals(props.get("falconDataOperation"), operation.name()); }