Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7CEA8176AD for ; Mon, 8 Jun 2015 06:23:11 +0000 (UTC) Received: (qmail 97910 invoked by uid 500); 8 Jun 2015 06:23:11 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 97843 invoked by uid 500); 8 Jun 2015 06:23:11 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 97794 invoked by uid 99); 8 Jun 2015 06:23:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 06:23:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0D844DFAF3; Mon, 8 Jun 2015 06:23:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samarthg@apache.org To: commits@falcon.apache.org Date: Mon, 08 Jun 2015 06:23:11 -0000 Message-Id: <07af62a8ec3a4ffb88726321ce29c06b@git.apache.org> In-Reply-To: <58b3e2fc5aa14dd5ab7e156d3732d3b4@git.apache.org> References: <58b3e2fc5aa14dd5ab7e156d3732d3b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] falcon git commit: FALCON-1227 Add logMover check in FeedReplication test. Contributed bu Pragya M FALCON-1227 Add logMover check in FeedReplication test. Contributed bu Pragya M Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/28fd15c4 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/28fd15c4 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/28fd15c4 Branch: refs/heads/master Commit: 28fd15c496f8175d242ab2f5b783aa41e1c44baf Parents: 041de1d Author: samarthg Authored: Mon Jun 8 11:52:44 2015 +0530 Committer: samarthg Committed: Mon Jun 8 11:52:44 2015 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 3 + .../falcon/regression/core/util/AssertUtil.java | 40 +++++++ .../falcon/regression/FeedReplicationTest.java | 120 +++++++++++++++---- .../apache/falcon/regression/LogMoverTest.java | 28 +---- 4 files changed, 141 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 189277b..3cd811f 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -92,6 +92,9 @@ Trunk (Unreleased) via Samarth Gupta) IMPROVEMENTS + + FALCON-1227 Add logMover check in FeedReplication test(Pragya M via Samarth G) + FALCON-1258 Fix feed validity and fortify ELExpFutureAndLatestTest (Ruslan Ostafiychuk) FALCON-1253 Fortify ExternalFSTest (Ruslan Ostafiychuk) http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java index 3abca7a..095e6f4 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java @@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; import org.apache.falcon.regression.core.response.ServiceResponse; import org.apache.falcon.regression.core.supportClasses.ExecResult; import org.apache.falcon.resource.APIResult; @@ -409,4 +410,43 @@ public final class AssertUtil { Assert.fail(String.format("%s expected non-empty string found [%s]", message, str)); } } + + /** + * Checks that job logs are copied to user defined cluster staging path. + * + * @param logFlag denotes whether is is failed/succeeded log + * @param entityName name of entity + * @param clusterFS hadoop file system for the locations + * @param entityType feed or process + */ + public static boolean assertPath(boolean logFlag, String entityName, FileSystem clusterFS, + String entityType) throws Exception { + String stagingDir= MerlinConstants.STAGING_LOCATION; + String path=stagingDir+"/falcon/workflows/"+ entityType + "/" + entityName +"/logs"; + List logmoverPaths = HadoopUtil + .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path))); + String part = logFlag ? "SUCCEEDED" : "FAILED"; + for (Path logmoverPath : logmoverPaths) { + if (logmoverPath.toString().contains(part)) { + return true; + } + } + return false; + } + + /** + * Checks that job logs are copied to user defined cluster staging path. + * + * @param logFlag denotes whether is is failed/succeeded log + * @param entityName name of entity + * @param clusterFS hadoop file system for the locations + * @param entityType feed or process + * @param message message returned if assert fails + */ + public static void assertLogMoverPath(boolean logFlag, String entityName, FileSystem clusterFS, + String entityType, String message) throws Exception { + Assert.assertTrue(assertPath(logFlag, entityName, clusterFS, entityType), message); + } + + } http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java index 6fd2348..a7a2ea8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java @@ -36,11 +36,9 @@ import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.falcon.resource.InstancesResult; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.log4j.Logger; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -53,7 +51,6 @@ import org.testng.annotations.Test; import javax.xml.bind.JAXBException; import java.io.IOException; -import java.net.URISyntaxException; import java.util.List; /** @@ -105,8 +102,7 @@ public class FeedReplicationTest extends BaseTestClass { */ @Test(dataProvider = "dataFlagProvider") public void replicate1Source1Target(boolean dataFlag) - throws AuthenticationException, IOException, URISyntaxException, JAXBException, - OozieClientException, InterruptedException { + throws Exception { Bundle.submitCluster(bundles[0], bundles[1]); String startTime = TimeUtil.getTimeWrtSystemTime(0); String endTime = TimeUtil.addMinsToTime(startTime, 5); @@ -158,7 +154,7 @@ public class FeedReplicationTest extends BaseTestClass { //replication should start, wait while it ends InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data has been replicated correctly List cluster1ReplicatedData = HadoopUtil @@ -173,6 +169,9 @@ public class FeedReplicationTest extends BaseTestClass { //_SUCCESS should exist in target Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true); + + AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()), + cluster2FS, "feed", "Success logs are not present"); } /** @@ -195,27 +194,27 @@ public class FeedReplicationTest extends BaseTestClass { feed.clearFeedClusters(); //set cluster1 as source feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()); //set cluster2 as target feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()); //set cluster3 as target feed.addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()); //submit and schedule feed LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); @@ -268,6 +267,9 @@ public class FeedReplicationTest extends BaseTestClass { //_SUCCESS should exist in target Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true); Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster3FS, toTarget, ""), true); + + AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()), + cluster2FS, "feed", "Success logs are not present"); } /** @@ -354,7 +356,7 @@ public class FeedReplicationTest extends BaseTestClass { //wait till instance succeed InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data was replicated correctly List cluster1ReplicatedData = HadoopUtil @@ -370,8 +372,78 @@ public class FeedReplicationTest extends BaseTestClass { //availabilityFlag should exist in target Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, availabilityFlagName), true); + + AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()), + cluster2FS, "feed", "Success logs are not present"); } + /** + * Test demonstrates failure pf replication of stored data from one source cluster to one target cluster. + * When replication job fails test checks if failed logs are present in staging directory or not. + */ + @Test + public void replicate1Source1TargetFail() + throws Exception { + Bundle.submitCluster(bundles[0], bundles[1]); + String startTime = TimeUtil.getTimeWrtSystemTime(0); + String endTime = TimeUtil.addMinsToTime(startTime, 5); + LOGGER.info("Time range between : " + startTime + " and " + endTime); + + //configure feed + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(feedDataLocation); + //erase all clusters from feed definition + feed.clearFeedClusters(); + //set cluster1 as source + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()); + //set cluster2 as target + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()); + + //submit and schedule feed + LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); + + //upload necessary data + DateTime date = new DateTime(startTime, DateTimeZone.UTC); + DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'"); + String timePattern = fmt.print(date); + String sourceLocation = sourcePath + "/" + timePattern + "/"; + String targetLocation = targetPath + "/" + timePattern + "/"; + HadoopUtil.recreateDir(cluster1FS, sourceLocation); + + Path toSource = new Path(sourceLocation); + Path toTarget = new Path(targetLocation); + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile.xml"); + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile1.txt"); + + //check if coordinator exists + InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1); + + //check if instance become running + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, + CoordinatorAction.Status.RUNNING, EntityType.FEED); + + HadoopUtil.deleteDirIfExists(sourceLocation, cluster1FS); + + //check if instance became killed + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, + CoordinatorAction.Status.KILLED, EntityType.FEED); + + AssertUtil.assertLogMoverPath(false, Util.readEntityName(feed.toString()), + cluster2FS, "feed", "Success logs are not present"); + } /* Flag value denotes whether to add data for replication or not. * flag=true : add data for replication. http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java index 4af27a2..56fe8ab 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java @@ -24,17 +24,14 @@ import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.util.*; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -126,7 +123,7 @@ public class LogMoverTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - Assert.assertTrue(validate(true), "Success logs are not present"); + AssertUtil.assertLogMoverPath(true, processName, clusterFS, "process", "Success logs are not present"); } /** @@ -143,28 +140,7 @@ public class LogMoverTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.KILLED, EntityType.PROCESS); - Assert.assertTrue(validate(false), "Filed logs are not present"); - } - - private boolean validate(boolean logFlag) throws Exception { - String stagingDir= MerlinConstants.STAGING_LOCATION; - String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs"; - List logmoverPaths = HadoopUtil - .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path))); - if (logFlag) { - for (Path logmoverPath : logmoverPaths) { - if (logmoverPath.toString().contains("SUCCEEDED")) { - return true; - } - } - } else { - for (Path logmoverPath : logmoverPaths) { - if (logmoverPath.toString().contains("FAILED")) { - return true; - } - } - } - return false; + AssertUtil.assertLogMoverPath(false, processName, clusterFS, "process", "Failed logs are not present"); } }