Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9CFB17959 for ; Thu, 9 Apr 2015 13:24:28 +0000 (UTC) Received: (qmail 23817 invoked by uid 500); 9 Apr 2015 13:24:28 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 23712 invoked by uid 500); 9 Apr 2015 13:24:28 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 23687 invoked by uid 99); 9 Apr 2015 13:24:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 13:24:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A7F56DFF99; Thu, 9 Apr 2015 13:24:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rostafiychuk@apache.org To: commits@falcon.apache.org Date: Thu, 09 Apr 2015 13:24:29 -0000 Message-Id: In-Reply-To: <7803c64c07984db6abdcde0149cf5dac@git.apache.org> References: <7803c64c07984db6abdcde0149cf5dac@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] falcon git commit: FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by Ruslan Ostafiychuk http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java index 13a9776..76d033c 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NewRetryTest.java @@ -93,7 +93,8 @@ public class NewRetryTest extends BaseTestClass { bundles[0].setProcessWorkflow(aggregateWorkflowDir); startDate = new DateTime(DateTimeZone.UTC).plusMinutes(1); endDate = new DateTime(DateTimeZone.UTC).plusMinutes(2); - bundles[0].setProcessValidity(startDate, endDate); + bundles[0].setProcessValidity(TimeUtil.dateToOozieDate(startDate.toDate()), + TimeUtil.dateToOozieDate(endDate.toDate())); FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); feed.setFeedPathValue(latePath).insertLateFeedValue(new Frequency("minutes(8)")); @@ -135,7 +136,7 @@ public class NewRetryTest extends BaseTestClass { //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25); @@ -150,7 +151,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper() .update((bundles[0].getProcessData()), bundles[0].getProcessData()); String newBundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS); + bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -183,7 +184,7 @@ public class NewRetryTest extends BaseTestClass { AssertUtil.assertSucceeded( prism.getProcessHelper().schedule(bundles[0].getProcessData())); String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); for (int attempt = 0; attempt < 20 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) { @@ -205,7 +206,7 @@ public class NewRetryTest extends BaseTestClass { .getMessage().contains("updated successfully"), "process was not updated successfully"); String newBundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS); + bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -242,7 +243,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); for (int i = 0; i < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++i) { TimeUtil.sleepSeconds(10); @@ -260,7 +261,7 @@ public class NewRetryTest extends BaseTestClass { .getMessage().contains("updated successfully"), "process was not updated successfully"); String newBundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS); + bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -295,7 +296,7 @@ public class NewRetryTest extends BaseTestClass { //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); for (int attempt = 0; attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 2); ++attempt) { @@ -316,7 +317,7 @@ public class NewRetryTest extends BaseTestClass { .getMessage().contains("updated successfully"), "process was not updated successfully"); String newBundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS); + bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -350,7 +351,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25); @@ -360,11 +361,11 @@ public class NewRetryTest extends BaseTestClass { LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue(prism.getProcessHelper() - .update(Util.readEntityName(bundles[0].getProcessData()), + .update(bundles[0].getProcessName(), null).getMessage() .contains("updated successfully"), "process was not updated successfully"); String newBundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS); + bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -398,7 +399,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25); @@ -408,11 +409,11 @@ public class NewRetryTest extends BaseTestClass { LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue( - prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData()), + prism.getProcessHelper().update(bundles[0].getProcessName(), bundles[0].getProcessData()).getMessage() .contains("updated successfully"), "process was not updated successfully"); String newBundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS); + bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -449,7 +450,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25); @@ -460,12 +461,12 @@ public class NewRetryTest extends BaseTestClass { LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertTrue(prism.getProcessHelper() - .update(Util.readEntityName(bundles[0].getProcessData()), + .update(bundles[0].getProcessName(), bundles[0].getProcessData()).getMessage() .contains("updated successfully"), "process was not updated successfully"); String newBundleId = InstanceUtil - .getLatestBundleID(cluster, Util.readEntityName(bundles[0].getProcessData()), + .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -503,7 +504,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); waitTillCertainPercentageOfProcessHasStarted(clusterOC, bundleId, 25); @@ -513,11 +514,11 @@ public class NewRetryTest extends BaseTestClass { LOGGER.info("going to update process at:" + DateTime.now(DateTimeZone.UTC)); Assert.assertFalse( - prism.getProcessHelper().update(Util.readEntityName(bundles[0].getProcessData()) + prism.getProcessHelper().update(bundles[0].getProcessName() , bundles[0].getProcessData()).getMessage().contains("updated successfully"), "process was updated successfully!!!"); String newBundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS); + bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(bundleId, newBundleId, "its creating a new bundle!!!"); @@ -555,7 +556,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); //now to validate all failed instances to check if they were retried or not. validateRetry(clusterOC, bundleId, @@ -595,7 +596,7 @@ public class NewRetryTest extends BaseTestClass { //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); for (int attempt = 0; attempt < 10 && !validateFailureRetries(clusterOC, bundleId, 1); ++attempt) { @@ -608,7 +609,7 @@ public class NewRetryTest extends BaseTestClass { LOGGER.info("now firing user reruns:"); for (int i = 0; i < 1; i++) { prism.getProcessHelper() - .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()), + .getProcessInstanceRerun(bundles[0].getProcessName(), "?start=" + timeFormatter.print(startDate).replace("/", "T") + "Z" + "&end=" + timeFormatter.print(endDate).replace("/", "T") + "Z"); } @@ -648,7 +649,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), + bundles[0].getProcessName(), EntityType.PROCESS).get(0); //now to validate all failed instances to check if they were retried or not. @@ -659,7 +660,7 @@ public class NewRetryTest extends BaseTestClass { DateTime[] dateBoundaries = getFailureTimeBoundaries(clusterOC, bundleId); InstancesResult piResult = prism.getProcessHelper() - .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()), + .getProcessInstanceRerun(bundles[0].getProcessName(), "?start=" + timeFormatter.print(dateBoundaries[0]).replace("/", "T") + "Z" + "&end=" + timeFormatter.print(dateBoundaries[dateBoundaries.length - 1]) .replace("/", "T") + "Z"); @@ -702,7 +703,7 @@ public class NewRetryTest extends BaseTestClass { AssertUtil.assertSucceeded( prism.getProcessHelper().schedule(bundles[0].getProcessData())); String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); List dates = null; for (int i = 0; i < 10 && dates == null; ++i) { @@ -802,7 +803,7 @@ public class NewRetryTest extends BaseTestClass { AssertUtil.assertSucceeded( prism.getProcessHelper().schedule(bundles[0].getProcessData())); String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); List dates = null; for (int i = 0; i < 10 && dates == null; ++i) { @@ -878,7 +879,7 @@ public class NewRetryTest extends BaseTestClass { prism.getProcessHelper().schedule(bundles[0].getProcessData())); //now wait till the process is over String bundleId = OozieUtil.getBundles(clusterOC, - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).get(0); + bundles[0].getProcessName(), EntityType.PROCESS).get(0); validateRetry(clusterOC, bundleId, (bundles[0].getProcessObject().getRetry().getAttempts()) / 2); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java index 59a701d..61c076b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java @@ -83,10 +83,9 @@ public class NoOutputProcessTest extends BaseTestClass { bundles[0].setInputFeedDataPath(inputPath); bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z"); bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin process = bundles[0].getProcessObject(); process.setOutputs(null); process.setLateProcess(null); - bundles[0].setProcessData(process.toString()); bundles[0].submitFeedsScheduleProcess(prism); } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java index 8cf2862..d7331cf 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java @@ -30,7 +30,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.falcon.resource.InstancesResult; import org.apache.hadoop.fs.FileSystem; @@ -103,7 +102,7 @@ public class ProcessFrequencyTest extends BaseTestClass { TimeUtil.oozieDateToDate(startDate)); HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT); - final String processName = Util.readEntityName(bundles[0].getProcessData()); + final String processName = bundles[0].getProcessName(); //InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java index 48cb59b..56e1474 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java @@ -119,11 +119,11 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass { List dataDates = TimeUtil.getMinuteDatesOnEitherSide( TimeUtil.getTimeWrtSystemTime(-35), TimeUtil.getTimeWrtSystemTime(25), 1); - String prefix = InstanceUtil.getFeedPrefix(feed01.toString()); + String prefix = feed01.getFeedPrefix(); HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS); HadoopUtil.flattenAndPutDataInFolder(cluster1FS, OSUtil.SINGLE_FILE, prefix, dataDates); - prefix = InstanceUtil.getFeedPrefix(feed02.toString()); + prefix = feed02.getFeedPrefix(); HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS); HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.SINGLE_FILE, prefix, dataDates); @@ -179,7 +179,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass { String processStartTime = TimeUtil.getTimeWrtSystemTime(-16); // String processEndTime = InstanceUtil.getTimeWrtSystemTime(20); - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin process = bundles[0].getProcessObject(); process.clearProcessCluster(); process.addProcessCluster( new ProcessMerlin.ProcessClusterBuilder( @@ -192,8 +192,7 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass { .withValidity(TimeUtil.addMinsToTime(processStartTime, 16), TimeUtil.addMinsToTime(processStartTime, 45)) .build()); - process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(), feed02.getName(), - feed02.getName())); + process.addInputFeed(feed02.getName(), feed02.getName()); //submit and schedule process prism.getProcessHelper().submitAndSchedule(process.toString()); @@ -204,42 +203,40 @@ public class ProcessInstanceColoMixedTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(serverOC.get(1), process.getName(), 1, Status.RUNNING, EntityType.PROCESS); - final String processName = Util.readEntityName(bundles[0].getProcessData()); - InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus( - processName, "?start=" + processStartTime - + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45)); + InstancesResult responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(), + "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(processName, + responseInstance = prism.getProcessHelper().getProcessInstanceSuspend(process.getName(), "?start=" + TimeUtil.addMinsToTime(processStartTime, 37) + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName, + responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(), "?start=" + TimeUtil.addMinsToTime(processStartTime, 37) + "&end=" + TimeUtil.addMinsToTime(processStartTime, 44)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper().getProcessInstanceResume(processName, + responseInstance = prism.getProcessHelper().getProcessInstanceResume(process.getName(), "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper().getProcessInstanceStatus(processName, + responseInstance = prism.getProcessHelper().getProcessInstanceStatus(process.getName(), "?start=" + TimeUtil.addMinsToTime(processStartTime, 16) + "&end=" + TimeUtil.addMinsToTime(processStartTime, 45)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(processName, + responseInstance = cluster1.getProcessHelper().getProcessInstanceKill(process.getName(), "?start=" + processStartTime + "&end="+ TimeUtil.addMinsToTime(processStartTime, 7)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); - responseInstance = prism.getProcessHelper().getProcessInstanceRerun(processName, + responseInstance = prism.getProcessHelper().getProcessInstanceRerun(process.getName(), "?start=" + processStartTime + "&end=" + TimeUtil.addMinsToTime(processStartTime, 7)); AssertUtil.assertSucceeded(responseInstance); Assert.assertTrue(responseInstance.getInstances() != null); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java index 34dfce3..f299128 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java @@ -29,7 +29,6 @@ import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesResult.WorkflowStatus; @@ -83,7 +82,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - processName = Util.readEntityName(bundles[0].getProcessData()); + processName = bundles[0].getProcessName(); } @AfterMethod(alwaysRun = true) http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java index f558cc5..3893ffe 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java @@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.falcon.resource.InstancesResult; import org.apache.hadoop.fs.FileSystem; @@ -79,7 +78,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass { bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].setProcessConcurrency(6); bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z"); - processName = Util.readEntityName(bundles[0].getProcessData()); + processName = bundles[0].getProcessName(); } @AfterMethod(alwaysRun = true) http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java index c9334eb..6003ee0 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java @@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.testHelper.BaseTestClass; @@ -83,7 +82,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass { bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].setOutputFeedLocationData(feedOutputPath); - processName = Util.readEntityName(bundles[0].getProcessData()); + processName = bundles[0].getProcessName(); } @AfterMethod(alwaysRun = true) http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java index 58936a7..635e238 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java @@ -27,7 +27,6 @@ import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.AssertUtil; @@ -97,7 +96,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessWorkflow(aggregateWorkflowDir); bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z"); bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - processName = Util.readEntityName(bundles[0].getProcessData()); + processName = bundles[0].getProcessName(); HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS); } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java index 26348bd..b7fed18 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java @@ -18,7 +18,6 @@ package org.apache.falcon.regression; -import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency.TimeUnit; @@ -29,7 +28,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.falcon.resource.InstancesResult; @@ -68,7 +66,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass { bundles[0].setProcessWorkflow(aggregateWorkflowDir); bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); bundles[0].setOutputFeedLocationData(feedOutputPath); - processName = Util.readEntityName(bundles[0].getProcessData()); + processName = bundles[0].getProcessName(); } @AfterMethod(alwaysRun = true) @@ -115,8 +113,8 @@ public class ProcessInstanceSuspendTest extends BaseTestClass { bundles[0].submitFeedsScheduleProcess(prism); InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, new ProcessMerlin(bundles[0] - .getProcessData()).getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z"); AssertUtil.assertSucceeded(r); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java index b41cf05..40a4ad2 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java @@ -85,8 +85,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes); bundles[0].setProcessConcurrency(2); - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); - String inputName = processMerlin.getInputs().getInputs().get(0).getName(); + String inputName = bundles[0].getProcessObject().getFirstInputName(); bundles[0].setProcessLatePolicy(getLateData(2, "minutes", "periodic", inputName, aggregateWorkflowDir)); bundles[0].submitAndScheduleProcess(); @@ -126,8 +125,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes); bundles[0].setProcessConcurrency(2); - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); - String inputName = processMerlin.getInputs().getInputs().get(0).getName(); + String inputName = bundles[0].getProcessObject().getFirstInputName(); bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir)); bundles[0].submitAndScheduleProcess(); @@ -166,8 +164,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].setProcessValidity(startTime, endTime); bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes); bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes); - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); - String inputName = processMerlin.getInputs().getInputs().get(0).getName(); + String inputName = bundles[0].getProcessObject().getFirstInputName(); bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir)); bundles[0].setProcessConcurrency(2); @@ -216,17 +213,17 @@ public class ProcessLateRerunTest extends BaseTestClass { // Increase the window of input for process bundles[0].setDatasetInstances(startInstance, endInstance); - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); - String inputName = processMerlin.getInputs().getInputs().get(0).getName(); - Input tempFeed = processMerlin.getInputs().getInputs().get(0); + ProcessMerlin process = bundles[0].getProcessObject(); + String inputName = process.getFirstInputName(); + Input tempFeed = process.getInputs().getInputs().get(0); Input gateInput = new Input(); gateInput.setName("Gate"); gateInput.setFeed(tempFeed.getFeed()); gateInput.setEnd("now(0,1)"); gateInput.setStart("now(0,1)"); - processMerlin.getInputs().getInputs().add(gateInput); - bundles[0].setProcessData(processMerlin.toString()); + process.getInputs().getInputs().add(gateInput); + bundles[0].setProcessData(process.toString()); bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir)); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java index cd7eba4..c73140d 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java @@ -80,7 +80,7 @@ public class ProcessSLATest extends BaseTestClass { @Test public void scheduleValidProcessSLA() throws Exception { - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin processMerlin = bundles[0].getProcessObject(); processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("6", Frequency.TimeUnit.hours)); bundles[0].setProcessData(processMerlin.toString()); @@ -95,7 +95,7 @@ public class ProcessSLATest extends BaseTestClass { @Test public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception { - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin processMerlin = bundles[0].getProcessObject(); processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), new Frequency("3", Frequency.TimeUnit.hours)); bundles[0].setProcessData(processMerlin.toString()); @@ -110,7 +110,7 @@ public class ProcessSLATest extends BaseTestClass { @Test public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception { - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin processMerlin = bundles[0].getProcessObject(); processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours), new Frequency("2", Frequency.TimeUnit.hours)); bundles[0].setProcessData(processMerlin.toString()); @@ -131,7 +131,7 @@ public class ProcessSLATest extends BaseTestClass { @Test public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception { - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin processMerlin = bundles[0].getProcessObject(); processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours)); processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours), new Frequency("4", Frequency.TimeUnit.hours)); @@ -147,7 +147,7 @@ public class ProcessSLATest extends BaseTestClass { @Test public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception { - ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin processMerlin = bundles[0].getProcessObject(); processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours)); processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours), new Frequency("4", Frequency.TimeUnit.hours)); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java index 9886d76..ca612b8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ValidateAPIPrismAndServerTest.java @@ -236,7 +236,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass { prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0)); prism.getFeedHelper().submitEntity(feed); prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle()); - ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin processObj = bundles[0].getProcessObject(); processObj.setWorkflow(null); ServiceResponse response = prism.getProcessHelper().validateEntity(processObj.toString()); AssertUtil.assertFailed(response); @@ -253,7 +253,7 @@ public class ValidateAPIPrismAndServerTest extends BaseTestClass { prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0)); prism.getFeedHelper().submitEntity(feed); prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle()); - ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin processObj = bundles[0].getProcessObject(); processObj.setWorkflow(null); ServiceResponse response = cluster.getProcessHelper().validateEntity(processObj.toString()); AssertUtil.assertFailed(response); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java index f9fcf8d..f58b0f6 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/EntitiesPatternSearchTest.java @@ -67,20 +67,20 @@ public class EntitiesPatternSearchTest extends BaseTestClass { //submit different clusters, feeds and processes FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin process = bundles[0].getProcessObject(); ClusterMerlin cluster = bundles[0].getClusterElement(); String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-'; String processNamePrefix = bundles[0].getProcessName() + '-'; String feedNamePrefix = bundles[0].getInputFeedNameFromBundle() + '-'; - List randomeNames = getPatternName(); - for (int i = 0; i < randomeNames.size(); i++) { - process.setName(processNamePrefix + randomeNames.get(i)); + List randomNames = getPatternName(); + for (Object randomName : randomNames) { + process.setName(processNamePrefix + randomName); AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString())); - feed.setName(feedNamePrefix + randomeNames.get(i)); + feed.setName(feedNamePrefix + randomName); AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); - cluster.setName(clusterNamePrefix + randomeNames.get(i)); + cluster.setName(clusterNamePrefix + randomName); AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster.toString())); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java index 13b3b88..3ae44e6 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/entity/ListEntitiesTest.java @@ -82,7 +82,7 @@ public class ListEntitiesTest extends BaseTestClass { //submit 10 different clusters, feeds and processes FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin process = bundles[0].getProcessObject(); ClusterMerlin cluster = bundles[0].getClusterElement(); String clusterNamePrefix = bundles[0].getClusterElement().getName() + '-'; String processNamePrefix = bundles[0].getProcessName() + '-'; http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java index 202298e..7d05d6b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatProcessTest.java @@ -253,8 +253,7 @@ public class HCatProcessTest extends BaseTestClass { feedObj.setName(inputFeed2Name); feedObj.getTable().setUri(inputTableUri2); - String inputFeed2 = feedObj.toString(); - bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0); + bundles[0].addInputFeedToBundle("inputData2", feedObj); String outputTableUri = "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment; @@ -347,7 +346,7 @@ public class HCatProcessTest extends BaseTestClass { FeedMerlin feedObj = new FeedMerlin(outputFeed1); feedObj.setName(outputFeed2Name); feedObj.getTable().setUri(outputTableUri2); - bundles[0].addOutputFeedToBundle("outputData2", feedObj.toString(), 0); + bundles[0].addOutputFeedToBundle("outputData2", feedObj); bundles[0].setProcessValidity(startDate, endDate); bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours); @@ -433,8 +432,7 @@ public class HCatProcessTest extends BaseTestClass { FeedMerlin feedObj = new FeedMerlin(inputFeed1); feedObj.setName(inputFeed2Name); feedObj.getTable().setUri(inputTableUri2); - String inputFeed2 = feedObj.toString(); - bundles[0].addInputFeedToBundle("inputData2", inputFeed2, 0); + bundles[0].addInputFeedToBundle("inputData2", feedObj); String outputTableUri = "catalog:" + dbName + ":" + outputTableName + tableUriPartitionFragment; @@ -448,8 +446,7 @@ public class HCatProcessTest extends BaseTestClass { FeedMerlin feedObj2 = new FeedMerlin(outputFeed1); feedObj2.setName(outputFeed2Name); feedObj2.getTable().setUri(outputTableUri2); - String outputFeed2 = feedObj2.toString(); - bundles[0].addOutputFeedToBundle("outputData2", outputFeed2, 0); + bundles[0].addOutputFeedToBundle("outputData2", feedObj2); bundles[0].setProcessValidity(startDate, endDate); bundles[0].setProcessPeriodicity(1, Frequency.TimeUnit.hours); bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)"); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java index 4466c13..3f6dc66 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java @@ -130,8 +130,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + .getLatestBundleID(cluster3, bundles[1].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -139,16 +138,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); - String updatedProcess = InstanceUtil - .setProcessFrequency(bundles[1].getProcessData(), - new Frequency("" + 5, TimeUnit.minutes)); + ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject()); + updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes)); - LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess)); + LOGGER.info("updated process: " + Util.prettyPrintXml(updatedProcess.toString())); //now to update while (Util .parseResponse(prism.getProcessHelper() - .update((bundles[1].getProcessData()), updatedProcess)) + .update((bundles[1].getProcessData()), updatedProcess.toString())) .getStatus() != APIResult.Status.SUCCEEDED) { LOGGER.info("update didn't SUCCEED in last attempt"); TimeUtil.sleepSeconds(10); @@ -186,7 +184,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); @@ -256,7 +254,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(25); int initialConcurrency = bundles[1].getProcessObject().getParallel(); @@ -271,14 +269,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to update AssertUtil.assertPartial(prism.getProcessHelper() .update(bundles[1].getProcessData(), bundles[1].getProcessData())); - String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency); - Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), workflowPath); - Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder()); + ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true)); + Assert.assertEquals(process.getParallel(), initialConcurrency); + Assert.assertEquals(process.getWorkflow().getPath(), workflowPath); + Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder()); String coloString = getResponse(cluster2, bundles[1].getProcessData(), true); - Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(), - workflowPath2); + Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(), workflowPath2); Util.startService(cluster3.getProcessHelper()); dualComparisonFailure(prism, cluster2, bundles[1].getProcessData()); @@ -296,13 +293,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { LOGGER.info("WARNING: update did not succeed, retrying "); TimeUtil.sleepSeconds(20); } - prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), - initialConcurrency + 3); - Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), - workflowPath2); - Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), - bundles[1].getProcessObject().getOrder()); + process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true)); + Assert.assertEquals(process.getParallel(), initialConcurrency + 3); + Assert.assertEquals(process.getWorkflow().getPath(), workflowPath2); + Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder()); dualComparison(prism, cluster3, bundles[1].getProcessData()); AssertUtil .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); @@ -336,7 +330,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -344,16 +338,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { EntityType.PROCESS); LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); - String updatedProcess = InstanceUtil - .setProcessFrequency(bundles[1].getProcessData(), - new Frequency("" + 7, TimeUnit.minutes)); + ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject()); + updatedProcess.setFrequency(new Frequency("7", TimeUnit.minutes)); LOGGER.info("updated process: " + updatedProcess); //now to update - ServiceResponse response = - prism.getProcessHelper().update(updatedProcess, updatedProcess); + prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString()); AssertUtil.assertSucceeded(response); OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); @@ -361,7 +353,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String prismString = getResponse(prism, bundles[1].getProcessData(), true); Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(), - new ProcessMerlin(updatedProcess).getFrequency()); + updatedProcess.getFrequency()); dualComparison(prism, cluster3, bundles[1].getProcessData()); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); } @@ -376,7 +368,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String originalProcessData = bundles[1].getProcessData(); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -410,7 +402,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -443,7 +435,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { // future : should be verified using cord xml Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); boolean doesExist = false; OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); @@ -452,7 +444,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { status != Job.Status.DONEWITHERROR) { int statusCount = InstanceUtil .getInstanceCountWithStatus(cluster3, - Util.readEntityName(bundles[1].getProcessData()), + bundles[1].getProcessName(), org.apache.oozie.client.CoordinatorAction.Status.RUNNING, EntityType.PROCESS); if (statusCount == bundles[1].getProcessObject().getParallel() + 3) { @@ -460,7 +452,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { break; } status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); Assert.assertNotNull(status, "status must not be null!"); TimeUtil.sleepSeconds(30); @@ -494,7 +486,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); @@ -577,7 +569,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); @@ -610,7 +602,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.checkStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); boolean doesExist = false; OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); @@ -619,7 +611,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { status != Job.Status.DONEWITHERROR) { if (InstanceUtil .getInstanceCountWithStatus(cluster3, - Util.readEntityName(bundles[1].getProcessData()), + bundles[1].getProcessName(), org.apache.oozie.client.CoordinatorAction.Status.RUNNING, EntityType.PROCESS) == @@ -628,7 +620,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { break; } status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); } Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!"); @@ -670,7 +662,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); @@ -707,7 +699,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { Job.Status status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); boolean doesExist = false; OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); @@ -716,7 +708,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { status != Job.Status.DONEWITHERROR) { if (InstanceUtil .getInstanceCountWithStatus(cluster3, - Util.readEntityName(bundles[1].getProcessData()), + bundles[1].getProcessName(), org.apache.oozie.client.CoordinatorAction.Status.RUNNING, EntityType.PROCESS) == @@ -725,13 +717,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { break; } status = OozieUtil.getOozieJobStatus(cluster3.getFeedHelper().getOozieClient(), - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(30); } Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!"); OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS), + bundles[1].getProcessName(), EntityType.PROCESS), oldNominalTimes, bundles[1].getProcessData(), false, true ); @@ -771,7 +763,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); @@ -793,13 +785,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .getProcessData())).getStatus() != APIResult.Status.SUCCEEDED) { TimeUtil.sleepSeconds(10); } - String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), - initialConcurrency + 3); - Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), - aggregator1Path); - Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), - bundles[1].getProcessObject().getOrder()); + ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true)); + Assert.assertEquals(process.getParallel(), initialConcurrency + 3); + Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path); + Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder()); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. @@ -839,7 +828,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -867,13 +856,10 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); - String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), - initialConcurrency + 3); - Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), - aggregator1Path); - Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), - bundles[1].getProcessObject().getOrder()); + ProcessMerlin process = new ProcessMerlin(getResponse(prism, bundles[1].getProcessData(), true)); + Assert.assertEquals(process.getParallel(), initialConcurrency + 3); + Assert.assertEquals(process.getWorkflow().getPath(), aggregator1Path); + Assert.assertEquals(process.getOrder(), bundles[1].getProcessObject().getOrder()); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. @@ -913,7 +899,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(20); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -965,7 +951,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, @@ -1090,7 +1076,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -1153,7 +1139,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(30); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -1220,7 +1206,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); List oldNominalTimes = @@ -1228,15 +1214,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); - String updatedProcess = InstanceUtil - .setProcessFrequency(bundles[1].getProcessData(), - new Frequency("" + 5, TimeUnit.minutes)); + ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject()); + updatedProcess.setFrequency(new Frequency("5", TimeUnit.minutes)); LOGGER.info("updated process: " + updatedProcess); //now to update ServiceResponse response = - prism.getProcessHelper().update(updatedProcess, updatedProcess); + prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString()); AssertUtil.assertSucceeded(response); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); @@ -1272,7 +1257,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, @@ -1280,18 +1265,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); - String updatedProcess = InstanceUtil - .setProcessFrequency(bundles[1].getProcessData(), - new Frequency("" + 1, TimeUnit.months)); - updatedProcess = InstanceUtil - .setProcessValidity(updatedProcess, TimeUtil.getTimeWrtSystemTime(10), - endTime); + ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject()); + updatedProcess.setFrequency(new Frequency("1", TimeUnit.months)); + updatedProcess.setValidity(TimeUtil.getTimeWrtSystemTime(10), endTime); LOGGER.info("updated process: " + updatedProcess); //now to update ServiceResponse response = - prism.getProcessHelper().update(updatedProcess, updatedProcess); + prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString()); AssertUtil.assertSucceeded(response); String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData()); Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(), @@ -1316,7 +1298,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); List oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); @@ -1351,7 +1333,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(30); OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId); @@ -1393,7 +1375,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .getValidity().getEnd())); Assert.assertEquals(InstanceUtil .getProcessInstanceList(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS) + bundles[1].getProcessName(), EntityType.PROCESS) .size(), getExpectedNumberOfWorkflowInstances(newStartTime, bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd())); @@ -1409,7 +1391,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, - Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(30); String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( @@ -1476,8 +1458,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //update workflow.xml hadoopFileEditor = new HadoopFileEditor(cluster1FS); - hadoopFileEditor.edit(new ProcessMerlin(b - .getProcessData()).getWorkflow().getPath() + "/workflow.xml", "", + hadoopFileEditor.edit(b.getProcessObject().getWorkflow().getPath() + "/workflow.xml", "", ""); //update @@ -1553,7 +1534,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { throws Exception { while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(), - Util.readEntityName(bundle.getProcessData()), EntityType.PROCESS) != state) { + bundle.getProcessName(), EntityType.PROCESS) != state) { //keep waiting TimeUtil.sleepSeconds(10); } @@ -1566,7 +1547,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { while (coord.getStatus() != state) { TimeUtil.sleepSeconds(10); coord = getDefaultOozieCoord(coloHelper, InstanceUtil - .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()), + .getLatestBundleID(coloHelper, bundle.getProcessName(), EntityType.PROCESS)); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java index c9e373e..39f0268 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java @@ -93,7 +93,7 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(2); - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin process = bundles[0].getProcessObject(); LOGGER.info(Util.prettyPrintXml(process.toString())); bundles[0].submitAndScheduleBundle(prism, false); @@ -125,7 +125,7 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(2); - String processName = Util.readEntityName(bundles[0].getProcessData()); + String processName = bundles[0].getProcessName(); LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData())); bundles[0].submitAndScheduleBundle(prism, false); @@ -163,7 +163,7 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(2); - String processName = Util.readEntityName(bundles[0].getProcessData()); + String processName = bundles[0].getProcessName(); LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData())); bundles[0].submitAndScheduleBundle(prism, false); @@ -232,11 +232,11 @@ public class OptionalInputTest extends BaseTestClass { LOGGER.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i))); } - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); - LOGGER.info(Util.prettyPrintXml(process.toString())); + + LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData())); bundles[0].submitAndScheduleBundle(prism, false); - InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(), + InstanceUtil.waitTillInstanceReachState(oozieClient, bundles[0].getProcessName(), 2, CoordinatorAction.Status.KILLED, EntityType.PROCESS); } @@ -262,10 +262,8 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(2); - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); - LOGGER.info(Util.prettyPrintXml(process.toString())); - String processName = process.getName(); - LOGGER.info(Util.prettyPrintXml(process.toString())); + LOGGER.info(Util.prettyPrintXml(bundles[0].getProcessData())); + String processName = bundles[0].getProcessName(); bundles[0].submitAndScheduleBundle(prism, true); InstanceUtil.waitTillInstanceReachState(oozieClient, processName, @@ -278,11 +276,9 @@ public class OptionalInputTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(oozieClient, processName, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - final ProcessMerlin processMerlin = new ProcessMerlin(process); - processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1); - bundles[0].setProcessData(processMerlin.toString()); - bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); - process = new ProcessMerlin(bundles[0].getProcessData()); + ProcessMerlin process = bundles[0].getProcessObject(); + process.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1); + process.setProcessInputStartEnd("now(0,-10)", "now(0,0)"); LOGGER.info("modified process:" + Util.prettyPrintXml(process.toString())); prism.getProcessHelper().update(process.toString(), process.toString()); @@ -319,25 +315,22 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(4); - ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); - String processName = process.getName(); + ProcessMerlin process = bundles[0].getProcessObject(); LOGGER.info(Util.prettyPrintXml(process.toString())); bundles[0].submitAndScheduleBundle(prism, true); - InstanceUtil.waitTillInstanceReachState(oozieClient, processName, + InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS); List dataDates = TimeUtil.getMinuteDatesOnEitherSide( TimeUtil.addMinsToTime(startTime, -10), TimeUtil.addMinsToTime(endTime, 10), 5); HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, - inputPath + "/input1/", dataDates); - InstanceUtil.waitTillInstanceReachState(oozieClient, processName, - 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + inputPath + "/input1/", dataDates); + InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(), + 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - final ProcessMerlin processMerlin = new ProcessMerlin(process); - processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1); - bundles[0].setProcessData(processMerlin.toString()); - process = new ProcessMerlin(bundles[0].getProcessData()); + process.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1); + bundles[0].setProcessData(process.toString()); //delete all input data HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS); @@ -347,7 +340,7 @@ public class OptionalInputTest extends BaseTestClass { prism.getProcessHelper().update(process.toString(), process.toString()); //from now on ... it should wait of input0 also - InstanceUtil.waitTillInstanceReachState(oozieClient, processName, + InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(), 2, CoordinatorAction.Status.KILLED, EntityType.PROCESS); } }