Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B032173DB for ; Wed, 3 Jun 2015 08:57:35 +0000 (UTC) Received: (qmail 89509 invoked by uid 500); 3 Jun 2015 08:57:34 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 89474 invoked by uid 500); 3 Jun 2015 08:57:34 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 89465 invoked by uid 99); 3 Jun 2015 08:57:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jun 2015 08:57:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB3E3E0AA1; Wed, 3 Jun 2015 08:57:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ajayyadava@apache.org To: commits@falcon.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1060 Handle transaction failures in Lineage. Contributed by Pavan Kumar Kolamuri Date: Wed, 3 Jun 2015 08:57:34 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master aaed4c7a8 -> 2ebf12837 FALCON-1060 Handle transaction failures in Lineage. Contributed by Pavan Kumar Kolamuri Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2ebf1283 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2ebf1283 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2ebf1283 Branch: refs/heads/master Commit: 2ebf12837b3fb330abdaf29d85a153ad92f96d84 Parents: aaed4c7 Author: Ajay Yadava Authored: Wed Jun 3 14:27:15 2015 +0530 Committer: Ajay Yadava Committed: Wed Jun 3 14:27:15 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../EntityRelationshipGraphBuilder.java | 53 +++++++-- .../InstanceRelationshipGraphBuilder.java | 3 +- .../falcon/metadata/MetadataMappingService.java | 114 +++++++++++-------- common/src/main/resources/startup.properties | 2 + .../metadata/MetadataMappingServiceTest.java | 77 +++++++++---- pom.xml | 2 +- src/conf/startup.properties | 2 + 8 files changed, 171 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e721841..9c84f85 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Trunk (Unreleased) NEW FEATURES IMPROVEMENTS + FALCON-1060 Handle transaction failures in Lineage(Pavan Kumar Kolamuri via Ajay Yadava) + FALCON-1212 Remove dependency on Gremlin (Ajay Yadava via Suhas Vasu) FALCON-1211 Source tarball are not generated in mvn assembly when profile is distributed http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java index d90f7ec..7ae7cd9 100644 --- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java @@ -21,6 +21,8 @@ package org.apache.falcon.metadata; import com.tinkerpop.blueprints.Graph; import com.tinkerpop.blueprints.Vertex; import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.feed.Feed; @@ -50,6 +52,23 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { super(graph, preserveHistory); } + public void addEntity(Entity entity) { + EntityType entityType = entity.getEntityType(); + switch (entityType) { + case CLUSTER: + addClusterEntity((Cluster) entity); + break; + case PROCESS: + addProcessEntity((Process) entity); + break; + case FEED: + addFeedEntity((Feed) entity); + break; + default: + throw new IllegalArgumentException("Invalid EntityType " + entityType); + } + } + public void addClusterEntity(Cluster clusterEntity) { LOG.info("Adding cluster entity: {}", clusterEntity.getName()); Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY); @@ -73,13 +92,31 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { } } + public void updateEntity(Entity oldEntity, Entity newEntity) { + EntityType entityType = oldEntity.getEntityType(); + switch (entityType) { + case CLUSTER: + // a cluster cannot be updated + break; + case PROCESS: + updateProcessEntity((Process) oldEntity, (Process) newEntity); + break; + case FEED: + updateFeedEntity((Feed) oldEntity, (Feed) newEntity); + break; + default: + throw new IllegalArgumentException("Invalid EntityType " + entityType); + } + } + + + public void updateFeedEntity(Feed oldFeed, Feed newFeed) { LOG.info("Updating feed entity: {}", newFeed.getName()); Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY); if (feedEntityVertex == null) { - // todo - throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist."); LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName()); - return; + throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist."); } updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex); @@ -110,9 +147,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { LOG.info("Updating process entity: {}", newProcess.getName()); Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY); if (processEntityVertex == null) { - // todo - throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist"); LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName()); - return; + throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist"); } updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(), @@ -133,9 +169,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) { Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY); if (clusterVertex == null) { // cluster must exist before adding other entities - // todo - throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName); LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName); - return; + throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName); } addEdge(fromVertex, clusterVertex, edgeLabel.getName()); @@ -164,9 +199,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); if (feedVertex == null) { - // todo - throw new IllegalStateException("Feed entity vertex must exist: " + feedName); LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); - return; + throw new IllegalStateException("Feed entity vertex must exist: " + feedName); } addProcessFeedEdge(processVertex, feedVertex, edgeLabel); @@ -405,9 +439,8 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); if (feedVertex == null) { - // todo - throw new IllegalStateException("Feed entity vertex must exist: " + feedName); LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); - return; + throw new IllegalStateException("Feed entity vertex must exist: " + feedName); } if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java index 213b020..17bf813 100644 --- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java @@ -128,9 +128,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { Vertex entityVertex = findVertex(entityName, entityType); LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex); if (entityVertex == null) { - // todo - throw new IllegalStateException(entityType + " entity vertex must exist " + entityName); LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName); - return; + throw new IllegalStateException(entityType + " entity vertex must exist " + entityName); } addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp); http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java index 9137fe0..ef9da45 100644 --- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java +++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java @@ -25,15 +25,14 @@ import com.tinkerpop.blueprints.GraphFactory; import com.tinkerpop.blueprints.KeyIndexableGraph; import com.tinkerpop.blueprints.TransactionalGraph; import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.util.TransactionRetryHelper; +import com.tinkerpop.blueprints.util.TransactionWork; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.service.FalconService; import org.apache.falcon.service.Services; @@ -73,6 +72,9 @@ public class MetadataMappingService private EntityRelationshipGraphBuilder entityGraphBuilder; private InstanceRelationshipGraphBuilder instanceGraphBuilder; + private int transactionRetries; + private long transactionRetryDelayInMillis; + @Override public String getName() { return SERVICE_NAME; @@ -99,6 +101,14 @@ public class MetadataMappingService ConfigurationStore.get().registerListener(this); Services.get().getService( WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this); + try { + transactionRetries = Integer.parseInt(StartupProperties.get().getProperty( + "falcon.graph.transaction.retry.count", "3")); + transactionRetryDelayInMillis = Long.parseLong(StartupProperties.get().getProperty( + "falcon.graph.transaction.retry.delay", "5")); + } catch (NumberFormatException e) { + throw new FalconException("Invalid values for graph transaction retry delay/count " + e); + } } protected Graph initializeGraphDB() { @@ -194,27 +204,23 @@ public class MetadataMappingService } @Override - public void onAdd(Entity entity) throws FalconException { + public void onAdd(final Entity entity) throws FalconException { EntityType entityType = entity.getEntityType(); LOG.info("Adding lineage for entity: {}, type: {}", entity.getName(), entityType); - - switch (entityType) { - case CLUSTER: - entityGraphBuilder.addClusterEntity((Cluster) entity); - getTransactionalGraph().commit(); - break; - - case FEED: - entityGraphBuilder.addFeedEntity((Feed) entity); - getTransactionalGraph().commit(); - break; - - case PROCESS: - entityGraphBuilder.addProcessEntity((Process) entity); - getTransactionalGraph().commit(); - break; - - default: + try { + new TransactionRetryHelper.Builder(getTransactionalGraph()) + .perform(new TransactionWork() { + @Override + public Void execute(TransactionalGraph transactionalGraph) throws Exception { + entityGraphBuilder.addEntity(entity); + transactionalGraph.commit(); + return null; + } + }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis); + + } catch (Exception e) { + getTransactionalGraph().rollback(); + throw new FalconException(e); } } @@ -225,26 +231,23 @@ public class MetadataMappingService } @Override - public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { + public void onChange(final Entity oldEntity, final Entity newEntity) throws FalconException { EntityType entityType = newEntity.getEntityType(); LOG.info("Updating lineage for entity: {}, type: {}", newEntity.getName(), entityType); - - switch (entityType) { - case CLUSTER: - // a cluster cannot be updated - break; - - case FEED: - entityGraphBuilder.updateFeedEntity((Feed) oldEntity, (Feed) newEntity); - getTransactionalGraph().commit(); - break; - - case PROCESS: - entityGraphBuilder.updateProcessEntity((Process) oldEntity, (Process) newEntity); - getTransactionalGraph().commit(); - break; - - default: + try { + new TransactionRetryHelper.Builder(getTransactionalGraph()) + .perform(new TransactionWork() { + @Override + public Void execute(TransactionalGraph transactionalGraph) throws Exception { + entityGraphBuilder.updateEntity(oldEntity, newEntity); + transactionalGraph.commit(); + return null; + } + }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis); + + } catch (Exception e) { + getTransactionalGraph().rollback(); + throw new FalconException(e); } } @@ -254,27 +257,38 @@ public class MetadataMappingService } @Override - public void onSuccess(WorkflowExecutionContext context) throws FalconException { - WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation(); - + public void onSuccess(final WorkflowExecutionContext context) throws FalconException { LOG.info("Adding lineage for context {}", context); + try { + new TransactionRetryHelper.Builder(getTransactionalGraph()) + .perform(new TransactionWork() { + @Override + public Void execute(TransactionalGraph transactionalGraph) throws Exception { + onSuccessfulExecution(context); + transactionalGraph.commit(); + return null; + } + }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis); + } catch (Exception e) { + getTransactionalGraph().rollback(); + throw new FalconException(e); + } + } + + private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException { + WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation(); switch (entityOperation) { case GENERATE: onProcessInstanceExecuted(context); - getTransactionalGraph().commit(); break; - case REPLICATE: onFeedInstanceReplicated(context); - getTransactionalGraph().commit(); break; - case DELETE: onFeedInstanceEvicted(context); - getTransactionalGraph().commit(); break; - default: + throw new IllegalArgumentException("Invalid EntityOperation" + entityOperation); } } @@ -283,6 +297,8 @@ public class MetadataMappingService // do nothing since lineage is only recorded for successful workflow } + + private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException { Vertex processInstance = instanceGraphBuilder.addProcessInstance(context); instanceGraphBuilder.addOutputFeedInstances(context, processInstance); http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 65746d7..28e7e50 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -112,6 +112,8 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle *.falcon.graph.storage.backend=berkeleyje *.falcon.graph.serialize.path=${user.dir}/target/graphdb *.falcon.graph.preserve.history=false +*.falcon.graph.transaction.retry.count=3 +*.falcon.graph.transaction.retry.delay=5 ######### Authentication Properties ######### http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java index 11d27fe..30eeaa4 100644 --- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java +++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java @@ -23,6 +23,7 @@ import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Graph; import com.tinkerpop.blueprints.GraphQuery; import com.tinkerpop.blueprints.Vertex; +import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EntityBuilderTestUtil; import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.store.ConfigurationStore; @@ -205,7 +206,7 @@ public class MetadataMappingServiceTest { public void testOnAddProcessEntity() throws Exception { processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION); + WORKFLOW_VERSION, inputFeeds, outputFeeds); verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY); verifyProcessEntityEdges(); @@ -303,8 +304,7 @@ public class MetadataMappingServiceTest { public void testLineageForReplicationForNonGeneratedInstances() throws Exception { cleanUp(); service.init(); - - addClusterAndFeedForReplication(); + addClusterAndFeedForReplication(inputFeeds); // Get the vertices before running replication WF long beforeVerticesCount = getVerticesCount(service.getGraph()); long beforeEdgesCount = getEdgesCount(service.getGraph()); @@ -427,6 +427,31 @@ public class MetadataMappingServiceTest { Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag } + @Test + public void testLineageForTransactionFailure() throws Exception { + clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, + "classification=production"); + verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); + verifyClusterEntityEdges(); + Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag + Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag + + Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null); + inputFeeds.add(feed); + outputFeeds.add(feed); + + try { + processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, + "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, + WORKFLOW_VERSION, inputFeeds, outputFeeds); + Assert.fail(); + } catch (FalconException e) { + Assert.assertEquals(getVerticesCount(service.getGraph()), 3); + Assert.assertEquals(getEdgesCount(service.getGraph()), 2); + } + + } + private void verifyUpdatedEdges(Feed newFeed) { Vertex feedVertex = getEntityVertex(newFeed.getName(), RelationshipType.FEED_ENTITY); @@ -556,24 +581,26 @@ public class MetadataMappingServiceTest { return feed; } + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck public Process addProcessEntity(String processName, Cluster cluster, String tags, String pipelineTags, String workflowName, - String version) throws Exception { + String version, List inFeeds, List outFeeds) throws Exception { Process process = EntityBuilderTestUtil.buildProcess(processName, cluster, tags, pipelineTags); EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version); - for (Feed inputFeed : inputFeeds) { + for (Feed inputFeed : inFeeds) { EntityBuilderTestUtil.addInput(process, inputFeed); } - for (Feed outputFeed : outputFeeds) { + for (Feed outputFeed : outFeeds) { EntityBuilderTestUtil.addOutput(process, outputFeed); } configStore.publish(EntityType.PROCESS, process); return process; } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) { if (storageType == Storage.TYPE.FILESYSTEM) { @@ -926,39 +953,44 @@ public class MetadataMappingServiceTest { Feed impressionsFeed = addFeedEntity("impression-feed", cluster, "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); - inputFeeds.add(impressionsFeed); + List inFeeds = new ArrayList<>(); + List outFeeds = new ArrayList<>(); + inFeeds.add(impressionsFeed); Feed clicksFeed = addFeedEntity("clicks-feed", cluster, "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); - inputFeeds.add(clicksFeed); + inFeeds.add(clicksFeed); Feed join1Feed = addFeedEntity("imp-click-join1", cluster, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outputFeeds.add(join1Feed); + outFeeds.add(join1Feed); Feed join2Feed = addFeedEntity("imp-click-join2", cluster, "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); - outputFeeds.add(join2Feed); + outFeeds.add(join2Feed); processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION); + WORKFLOW_VERSION, inFeeds, outFeeds); } private void setupForLineageReplication() throws Exception { cleanUp(); service.init(); - addClusterAndFeedForReplication(); + List inFeeds = new ArrayList<>(); + List outFeeds = new ArrayList<>(); + + addClusterAndFeedForReplication(inFeeds); // Add output feed Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outputFeeds.add(join1Feed); + outFeeds.add(join1Feed); processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION); + WORKFLOW_VERSION, inFeeds, outFeeds); // GENERATE WF should have run before this to create all instance related vertices WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( @@ -969,7 +1001,7 @@ public class MetadataMappingServiceTest { service.onSuccess(context); } - private void addClusterAndFeedForReplication() throws Exception { + private void addClusterAndFeedForReplication(List inFeeds) throws Exception { // Add cluster clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production"); @@ -1000,7 +1032,7 @@ public class MetadataMappingServiceTest { } finally { configStore.cleanupUpdateInit(); } - inputFeeds.add(rawFeed); + inFeeds.add(rawFeed); } private void setupForLineageEviction() throws Exception { @@ -1021,27 +1053,28 @@ public class MetadataMappingServiceTest { // Add cluster clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production"); - + List inFeeds = new ArrayList<>(); + List outFeeds = new ArrayList<>(); // Add input and output feeds Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/impression-feed"); - inputFeeds.add(impressionsFeed); + inFeeds.add(impressionsFeed); Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed"); - inputFeeds.add(clicksFeed); + inFeeds.add(clicksFeed); Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1"); - outputFeeds.add(join1Feed); + outFeeds.add(join1Feed); Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2"); - outputFeeds.add(join2Feed); + outFeeds.add(join2Feed); processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION); + WORKFLOW_VERSION, inFeeds, outFeeds); } http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4d1dbb4..0689899 100644 --- a/pom.xml +++ b/pom.xml @@ -920,7 +920,7 @@ com.tinkerpop.blueprints blueprints-core - 2.4.0 + 2.5.0 http://git-wip-us.apache.org/repos/asf/falcon/blob/2ebf1283/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 64a7d27..3681cb9 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -107,6 +107,8 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.graph.storage.backend=berkeleyje *.falcon.graph.serialize.path=/${falcon.home}/data *.falcon.graph.preserve.history=false +*.falcon.graph.transaction.retry.count=3 +*.falcon.graph.transaction.retry.delay=5 ######### Authentication Properties #########