Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 03F2117FD9 for ; Tue, 3 Mar 2015 05:14:21 +0000 (UTC) Received: (qmail 46561 invoked by uid 500); 3 Mar 2015 05:14:17 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 46526 invoked by uid 500); 3 Mar 2015 05:14:17 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 46517 invoked by uid 99); 3 Mar 2015 05:14:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 05:14:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86293E034B; Tue, 3 Mar 2015 05:14:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samarthg@apache.org To: commits@falcon.apache.org Message-Id: <51f810113b6b4ca0a9b640e39d2b0579@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-974 add test in falcon-regression for log mover feature. Contributed by Pragya M Date: Tue, 3 Mar 2015 05:14:17 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master 108b36d42 -> 23b325221 FALCON-974 add test in falcon-regression for log mover feature. Contributed by Pragya M Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/23b32522 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/23b32522 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/23b32522 Branch: refs/heads/master Commit: 23b325221b4758e1eb5df75726382f5025b162cb Parents: 108b36d Author: samarthg Authored: Tue Mar 3 10:32:02 2015 +0530 Committer: samarthg Committed: Tue Mar 3 10:32:02 2015 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../falcon/regression/core/bundle/Bundle.java | 7 + .../apache/falcon/regression/LogMoverTest.java | 170 +++++++++++++++++++ .../merlin/src/test/resources/LogMover/id.pig | 21 +++ .../src/test/resources/LogMover/workflow.xml | 39 +++++ 5 files changed, 239 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 8bd89ef..8df604a 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -6,6 +6,8 @@ Trunk (Unreleased) NEW FEATURES + FALCON-974 add test in falcon-regression for log mover feature(Pragya via Samarth Gupta) + FALCON-843 add test to support current & last week el expression(Pragya M via Samarth G) FALCON-1035 Add test in falcon regression for validate feature. Validate is exposed http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java index 8e449fb..0dfd895 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java @@ -638,6 +638,13 @@ public class Bundle { writeFeedElement(feedElement, feedName); } + public void setOutputFeedAvailabilityFlag(String flag) { + String feedName = getOutputFeedNameFromBundle(); + FeedMerlin feedElement = getFeedElement(feedName); + feedElement.setAvailabilityFlag(flag); + writeFeedElement(feedElement, feedName); + } + public void setCLusterColo(String colo) { ClusterMerlin c = getClusterElement(); c.setColo(colo); http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java new file mode 100644 index 0000000..4ce6026 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression; + +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.process.Properties; +import org.apache.falcon.entity.v0.process.Property; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency.TimeUnit; +import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.util.*; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.Job; +import org.apache.oozie.client.OozieClient; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; + +/** + * LogMover Test. + * Adds job launcher success/failure logs to falcon staging directory. + * It is not working for map-reduce actions(FALCON-1038). + * Using pig-action to test this feature. + */ +@Test(groups = "embedded") +public class LogMoverTest extends BaseTestClass { + + private ColoHelper cluster = servers.get(0); + private FileSystem clusterFS = serverFS.get(0); + private OozieClient clusterOC = serverOC.get(0); + private String pigTestDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator"; + private String inputPath = pigTestDir + "/input" + MINUTE_DATE_PATTERN; + private String propPath = pigTestDir + "/LogMover"; + private static final Logger LOGGER = Logger.getLogger(LogMoverTest.class); + private String processName; + private String process; + private String startDate; + private String endDate; + + @BeforeMethod(alwaysRun = true) + public void setUp() throws Exception { + LOGGER.info("in @BeforeMethod"); + startDate = TimeUtil.getTimeWrtSystemTime(-3); + endDate = TimeUtil.getTimeWrtSystemTime(3); + + LOGGER.info("startDate : " + startDate + " , endDate : " + endDate); + //copy pig script and workflow + HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES + "LogMover"); + Bundle bundle = BundleUtil.readELBundle(); + bundles[0] = new Bundle(bundle, cluster); + bundles[0].generateUniqueBundle(this); + bundles[0].setInputFeedDataPath(inputPath); + bundles[0].setInputFeedPeriodicity(1, TimeUnit.minutes); + bundles[0].setOutputFeedLocationData(pigTestDir + "/output-data" + MINUTE_DATE_PATTERN); + bundles[0].setOutputFeedAvailabilityFlag("_SUCCESS"); + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + bundles[0].setProcessInputNames("INPUT"); + bundles[0].setProcessOutputNames("OUTPUT"); + bundles[0].setProcessValidity(startDate, endDate); + bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); + bundles[0].setOutputFeedPeriodicity(1, TimeUnit.minutes); + + List dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20); + HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, + bundles[0].getFeedDataPathPrefix(), dataDates); + + // Defining path to be used in pig script + final Process processElement = bundles[0].getProcessObject(); + final Properties properties = new Properties(); + final Property property = new Property(); + property.setName("inputPath"); + property.setValue(propPath); + properties.getProperties().add(property); + processElement.setProperties(properties); + bundles[0].setProcessData(processElement.toString()); + process = bundles[0].getProcessData(); + processName = Util.readEntityName(process); + + } + + @AfterMethod(alwaysRun = true) + public void tearDown() { + removeTestClassEntities(); + } + + /** + *Schedule a process. It should succeed and job launcher success information + * should be present in falcon staging directory. + */ + @Test(groups = {"singleCluster"}) + public void logMoverSucceedTest() throws Exception { + bundles[0].submitFeedsScheduleProcess(prism); + AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); + + //Copy data to let pig job succeed + HadoopUtil.copyDataToFolder(clusterFS, propPath, OSUtil.RESOURCES + "pig"); + + InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + Assert.assertTrue(validate(true), "Success logs are not present"); + } + + /** + *Schedule a process. It should fail and job launcher failure information + * should be present in falcon staging directory. + */ + @Test(groups = {"singleCluster"}) + public void logMoverFailTest() throws Exception { + bundles[0].submitFeedsScheduleProcess(prism); + AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); + + InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, + CoordinatorAction.Status.KILLED, EntityType.PROCESS); + + Assert.assertTrue(validate(false), "Filed logs are not present"); + } + + private boolean validate(boolean logFlag) throws Exception { + String stagingDir= MerlinConstants.STAGING_LOCATION; + String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs"; + List logmoverPath = HadoopUtil + .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path))); + if (logFlag) { + for(int index=0; index < logmoverPath.size(); index++) { + if (logmoverPath.get(index).toString().contains("SUCCEEDED")) { + return true; + } + } + } else { + for(int index=0; index < logmoverPath.size(); index++) { + if (logmoverPath.get(index).toString().contains("FAILED")) { + return true; + } + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/resources/LogMover/id.pig ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/resources/LogMover/id.pig b/falcon-regression/merlin/src/test/resources/LogMover/id.pig new file mode 100644 index 0000000..2d5e64b --- /dev/null +++ b/falcon-regression/merlin/src/test/resources/LogMover/id.pig @@ -0,0 +1,21 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +A = load '$INPUT' using PigStorage(':'); +B = foreach A generate $0 as id; +store B into '$OUTPUT' USING PigStorage(); +cp $inputNew $outputNew http://git-wip-us.apache.org/repos/asf/falcon/blob/23b32522/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml b/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml new file mode 100644 index 0000000..906481c --- /dev/null +++ b/falcon-regression/merlin/src/test/resources/LogMover/workflow.xml @@ -0,0 +1,39 @@ + + + + + + ${jobTracker} + ${nameNode} + + INPUT=${INPUT} + OUTPUT=${OUTPUT} + inputNew=${inputPath} + outputNew="$OUTPUT/op/" + + + + + + + + The Pig job has failed.Reason: message[${wf:errorMessage(wf:lastErrorNode())}] + + +