Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DACFF17957 for ; Wed, 30 Sep 2015 10:54:12 +0000 (UTC) Received: (qmail 69353 invoked by uid 500); 30 Sep 2015 10:54:06 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 69312 invoked by uid 500); 30 Sep 2015 10:54:06 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 69303 invoked by uid 99); 30 Sep 2015 10:54:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Sep 2015 10:54:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33B77DFF0D; Wed, 30 Sep 2015 10:54:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ajayyadava@apache.org To: commits@falcon.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1387 Add Instance Dependency API Test. Contributed by Pragya Mittal. Date: Wed, 30 Sep 2015 10:54:06 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master d6965213a -> 8e0d5c584 FALCON-1387 Add Instance Dependency API Test. Contributed by Pragya Mittal. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8e0d5c58 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8e0d5c58 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8e0d5c58 Branch: refs/heads/master Commit: 8e0d5c584ee738ef19496c1f30a629861b69b634 Parents: d696521 Author: Ajay Yadava Authored: Wed Sep 30 16:23:46 2015 +0530 Committer: Ajay Yadava Committed: Wed Sep 30 16:23:46 2015 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../core/enumsAndConstants/ResponseErrors.java | 6 +- .../helpers/entity/AbstractEntityHelper.java | 13 +- .../falcon/regression/core/util/AssertUtil.java | 15 +- .../regression/core/util/InstanceUtil.java | 143 +++++++++++- .../falcon/regression/core/util/MatrixUtil.java | 10 + .../falcon/regression/core/util/OozieUtil.java | 29 +++ .../falcon/regression/core/util/TimeUtil.java | 25 ++- .../falcon/regression/core/util/Util.java | 1 + .../triage/FeedInstanceDependencyTest.java | 221 +++++++++++++++++++ .../triage/ProcessInstanceDependencyTest.java | 204 +++++++++++++++++ 11 files changed, 662 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 1aff5a4..92f722a 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1387 Add Instance Dependency API Test(Pragya Mittal via Ajay Yadava) + FALCON-1382 Add a test for feed retention to make sure that data directory is not deleted (Paul Isaychuk) FALCON-1321 Add Entity Lineage Test (Pragya Mittal via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java index 85f3692..921a303 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java @@ -23,7 +23,11 @@ public enum ResponseErrors { PROCESS_NOT_FOUND("(PROCESS) not found"), UNPARSEABLE_DATE("Start and End dates cannot be empty for Instance POST apis"), - START_BEFORE_SCHEDULED("is before the entity was scheduled"); + START_BEFORE_SCHEDULED("is before the entity was scheduled"), + PROCESS_INVALID_RANGE("is not in validity range of process"), + PROCESS_INSTANCE_FAULT("is not a valid instance time on cluster"), + FEED_INVALID_RANGE("is not in validity range for Feed"), + FEED_INSTANCE_FAULT("is not a valid instance for the feed"); private String error; http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java index 0bd8a32..83d06a2 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java @@ -35,6 +35,7 @@ import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.core.util.Util.URLS; import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.hadoop.conf.Configuration; @@ -662,7 +663,6 @@ public abstract class AbstractEntityHelper { */ public ServiceResponse getEntityLineage(String params) throws URISyntaxException, AuthenticationException, InterruptedException, IOException { - String url = createUrl(this.hostname + URLS.ENTITY_LINEAGE.getValue(), colo); if (StringUtils.isNotEmpty(params)){ url += colo.isEmpty() ? "?" + params : "&" + params; @@ -670,4 +670,15 @@ public abstract class AbstractEntityHelper { return Util.sendRequestLineage(createUrl(url), "get", null, null); } + /** + * Retrieves instance dependencies. + */ + public InstanceDependencyResult getInstanceDependencies( + String entityName, String params, String user) + throws IOException, URISyntaxException, AuthenticationException, InterruptedException { + String url = createUrl(this.hostname + URLS.INSTANCE_DEPENDENCIES.getValue(), getEntityType(), entityName, ""); + return (InstanceDependencyResult) InstanceUtil + .createAndSendRequestProcessInstance(url, params, allColo, user); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java index 1546415..d8df0fb 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java @@ -26,7 +26,6 @@ import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; import org.apache.falcon.regression.core.response.ServiceResponse; import org.apache.falcon.regression.core.supportClasses.ExecResult; import org.apache.falcon.resource.APIResult; -import org.apache.falcon.resource.InstancesResult; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -192,7 +191,7 @@ public final class AssertUtil { * * @param response ProcessInstancesResult */ - public static void assertSucceeded(InstancesResult response) { + public static void assertSucceeded(APIResult response) { Assert.assertNotNull(response.getMessage(), "Status message is null"); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED, "Status should be SUCCEEDED. Message: " + response.getMessage()); @@ -448,5 +447,15 @@ public final class AssertUtil { Assert.assertTrue(assertPath(logFlag, entityName, clusterFS, entityType), message); } - + /** + * Checks that API Response status is FAILED. + * + * @param response APIResult + * @throws JAXBException + */ + public static void assertFailedInstance(APIResult response) throws JAXBException { + Assert.assertEquals(response.getStatus(), APIResult.Status.FAILED, + "Status should be FAILED. Message: " + response.getMessage()); + Assert.assertNotNull(response.getMessage(), "response message should not be null"); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java index 4550666..10463c2 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java @@ -32,8 +32,11 @@ import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; import org.apache.falcon.request.BaseRequest; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.resource.SchedulableEntityInstance; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.http.HttpResponse; import org.apache.log4j.Logger; @@ -45,17 +48,22 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowJob; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.json.JSONException; import org.testng.Assert; import java.io.IOException; import java.lang.reflect.Type; import java.net.URISyntaxException; +import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * util functions related to instanceTest. @@ -88,6 +96,8 @@ public final class InstanceUtil { result = new InstancesSummaryResult(APIResult.Status.FAILED, responseString); }else if (url.contains("/listing/")) { result = new FeedInstanceResult(APIResult.Status.FAILED, responseString); + }else if (url.contains("/dependencies/")) { + result = new InstanceDependencyResult(APIResult.Status.FAILED, responseString); }else { result = new InstancesResult(APIResult.Status.FAILED, responseString); } @@ -118,7 +128,8 @@ public final class InstanceUtil { } }).create().fromJson(responseString, url.contains("/listing/") ? FeedInstanceResult.class : url.contains("/summary/") - ? InstancesSummaryResult.class : InstancesResult.class); + ? InstancesSummaryResult.class : url.contains("/dependencies/") + ? InstanceDependencyResult.class : InstancesResult.class); } catch (JsonSyntaxException e) { Assert.fail("Not a valid json:\n" + responseString); } @@ -694,5 +705,135 @@ public final class InstanceUtil { int sleep = INSTANCES_CREATED_TIMEOUT * 60 / 5; waitTillInstancesAreCreated(oozieClient, entity, bundleSeqNo, sleep); } + + /** + * Asserts instances of specific job will be present for given instanceTime. + * + * @param instancesResult InstanceDependencyResult + * @param oozieClient oozieClient of cluster job is running on + * @param bundleID bundleId of job + * @param time instanceTime. + * @throws JSONException + * @throws ParseException + */ + public static void assertProcessInstances(InstanceDependencyResult instancesResult, OozieClient oozieClient, + String bundleID, String time) throws OozieClientException, + JSONException, ParseException { + List inputPath = new ArrayList<>(); + List outputPath = new ArrayList<>(); + SchedulableEntityInstance[] instances = instancesResult.getDependencies(); + LOGGER.info("instances: " + Arrays.toString(instances)); + Assert.assertNotNull(instances, "instances should be not null"); + for (SchedulableEntityInstance instance : instances) { + Assert.assertNotNull(instance.getCluster()); + Assert.assertNotNull(instance.getEntityName()); + Assert.assertNotNull(instance.getEntityType()); + Assert.assertNotNull(instance.getInstanceTime()); + Assert.assertNotNull(instance.getTags()); + if (instance.getTags().equals("Input")) { + inputPath.add(new DateTime(instance.getInstanceTime(), DateTimeZone.UTC).toString()); + } + if (instance.getTags().equals("Output")) { + outputPath.add(new DateTime(instance.getInstanceTime(), DateTimeZone.UTC).toString()); + } + } + + List inputActual = getMinuteDatesToPath(inputPath.get(inputPath.indexOf( + Collections.min(inputPath))), inputPath.get(inputPath.indexOf(Collections.max(inputPath))), 5); + List outputActual = getMinuteDatesToPath(outputPath.get(outputPath.indexOf(Collections.min( + outputPath))), outputPath.get(outputPath.indexOf(Collections.max(outputPath))), 5); + + Configuration conf = OozieUtil.getProcessConf(oozieClient, bundleID, time); + Assert.assertNotNull(conf, "Configuration should not be null"); + List inputExp = Arrays.asList(conf.get("inputData").split(",")); + List outputExp = Arrays.asList(conf.get("outputData").split(",")); + + Assert.assertTrue(matchList(inputExp, inputActual), " Inputs dont match"); + Assert.assertTrue(matchList(outputExp, outputActual), " Outputs dont match"); + + } + + /** + * Returns list of path based on given start and end time. + * + * @param startOozieDate start date + * @param endOozieDate end date + * @param minuteSkip difference between paths + * @throws ParseException + */ + public static List getMinuteDatesToPath(String startOozieDate, String endOozieDate, + int minuteSkip) throws ParseException { + String myFormat = "yyyy'-'MM'-'dd'T'HH':'mm'Z'"; + String userFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'SSS'Z'"; + return TimeUtil.getMinuteDatesOnEitherSide(TimeUtil.parseDate(startOozieDate, myFormat, userFormat), + TimeUtil.parseDate(endOozieDate, myFormat, userFormat), minuteSkip); + } + + /** + * Parses date from one format to another. + * + * @param oozieDate input date + * @throws ParseException + */ + public static String getParsedDates(String oozieDate) throws ParseException { + String myFormat = "yyyy'-'MM'-'dd'T'HH':'mm'Z'"; + String userFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'SSS'Z'"; + return TimeUtil.parseDate(oozieDate, myFormat, userFormat); + } + + /** + * Asserts Whether two list are equal or not. + * + * @param firstList list to be comapred + * @param secondList list to be compared + */ + public static boolean matchList(List firstList, List secondList) { + Collections.sort(firstList); + Collections.sort(secondList); + if (firstList.size() != secondList.size()) { + return false; + } + for (int index = 0; index < firstList.size(); index++) { + if (!firstList.get(index).contains(secondList.get(index))) { + return false; + } + } + return true; + } + + /** + * Asserts instanceDependencyResult of specific job for a given feed. + * + * @param instancesResult InstanceDependencyResult + * @param processName process name for given bundle + * @param tag Input/Output + * @param expectedInstances instance for given instanceTime. + * @throws JSONException + * @throws ParseException + * @throws OozieClientException + */ + public static void assertFeedInstances(InstanceDependencyResult instancesResult, String processName, String tag, + List expectedInstances) + throws OozieClientException, JSONException, ParseException { + List actualInstances = new ArrayList<>(); + SchedulableEntityInstance[] instances = instancesResult.getDependencies(); + LOGGER.info("instances: " + Arrays.toString(instances)); + Assert.assertNotNull(instances, "instances should be not null"); + for (SchedulableEntityInstance instance : instances) { + Assert.assertNotNull(instance.getCluster()); + Assert.assertNotNull(instance.getEntityName()); + Assert.assertNotNull(instance.getEntityType()); + Assert.assertNotNull(instance.getInstanceTime()); + Assert.assertNotNull(instance.getTags()); + Assert.assertTrue(instance.getEntityType().toString().equals("PROCESS"), "Type should be PROCESS"); + Assert.assertTrue(instance.getEntityName().equals(processName), "Expected name is : " + processName); + Assert.assertTrue(instance.getTags().equals(tag)); + actualInstances.add(getParsedDates(new DateTime(instance.getInstanceTime(), DateTimeZone.UTC).toString())); + } + + Set expectedInstancesSet = new HashSet<>(expectedInstances); + Set actualInstancesSet = new HashSet<>(actualInstances); + Assert.assertEquals(expectedInstancesSet, actualInstancesSet, "Instances dont match"); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java index c68dd3c..14315b3 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java @@ -76,4 +76,14 @@ public final class MatrixUtil { + "row of second array: " + Arrays.deepToString(arr2[0])); return (Object[][]) ArrayUtils.addAll(arr1, arr2); } + + /** + * Cross product many arrays. + * @param firstArray first array that you want to cross product + * @param otherArrays other arrays that you want to cross product + * @return cross product + */ + public static Object[][] crossProductNew(Object[] firstArray, Object[][]... otherArrays) { + return crossProduct(firstArray, otherArrays); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index f327178..5e2c7b2 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -21,6 +21,7 @@ package org.apache.falcon.regression.core.util; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; +import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.AuthOozieClient; import org.apache.oozie.client.BundleJob; import org.apache.oozie.client.OozieClient; @@ -34,8 +35,10 @@ import org.apache.log4j.Logger; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import org.json.JSONException; import org.testng.Assert; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -725,4 +728,30 @@ public final class OozieUtil { } return FAIL_MSG; } + + /** + * Returns configuration object of a given bundleID for a given instanceTime. + * + * @param oozieClient oozie client of cluster job is running on + * @param bundleID name of process which job is being analyzed + * @param time job status we are waiting for + * @throws org.apache.oozie.client.OozieClientException + * @throws org.json.JSONException + */ + public static Configuration getProcessConf(OozieClient oozieClient, String bundleID, String time) + throws OozieClientException, JSONException { + waitForCoordinatorJobCreation(oozieClient, bundleID); + List coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); + CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordJobs.get(0).getId()); + + Configuration conf = new Configuration(); + for (CoordinatorAction action : coordJobInfo.getActions()) { + String dateStr = (new DateTime(action.getNominalTime(), DateTimeZone.UTC)).toString(); + if (!dateStr.isEmpty() && dateStr.contains(time.replace("Z", ""))) { + conf.addResource(new ByteArrayInputStream(oozieClient.getJobInfo(action.getExternalId()). + getConf().getBytes())); + } + } + return conf; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java index c5dd0fd..292a516 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java @@ -25,6 +25,8 @@ import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; @@ -33,7 +35,7 @@ import java.util.concurrent.TimeUnit; /** -all time / date related util methods for merlin . need to move methods from +All time / date related util methods for merlin . need to move methods from instanceUtil to here , pending item. */ @@ -275,4 +277,25 @@ public final class TimeUtil { String numbers = expression.substring(expression.indexOf('(') + 1, expression.indexOf(')')); return Integer.parseInt(numbers.split(",")[position]); } + + /** + * Converts given date from one format to another. + * + * @param date input date + * @param myFormat input date format + * @param userFormat required format + * @return date in userFormat + */ + public static String parseDate(String date, String myFormat, String userFormat) throws ParseException { + SimpleDateFormat formatter = new SimpleDateFormat(myFormat); + SimpleDateFormat fromUser = new SimpleDateFormat(userFormat); + String reformattedStr=""; + try { + reformattedStr = formatter.format(fromUser.parse(date)); + LOGGER.info(reformattedStr); + } catch (ParseException e) { + e.printStackTrace(); + } + return reformattedStr; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java index 6c8d4ee..83547e7 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java @@ -394,6 +394,7 @@ public final class Util { INSTANCE_LIST("/api/instance/list"), INSTANCE_LISTING("/api/instance/listing"), INSTANCE_LOGS("/api/instance/logs"), + INSTANCE_DEPENDENCIES("/api/instance/dependencies"), TOUCH_URL("/api/entities/touch"); private final String url; http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/FeedInstanceDependencyTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/FeedInstanceDependencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/FeedInstanceDependencyTest.java new file mode 100644 index 0000000..fe9f565 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/FeedInstanceDependencyTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.triage; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency.TimeUnit; +import org.apache.falcon.regression.Entities.ProcessMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.falcon.resource.InstanceDependencyResult; +import org.apache.log4j.Logger; +import org.apache.oozie.client.OozieClient; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.List; + +/** + * Test Suite for feed InstanceDependency corresponding to FALCON-1039. + */ +@Test(groups = "embedded") +public class FeedInstanceDependencyTest extends BaseTestClass { + + private String baseTestDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = baseTestDir + "/aggregator"; + private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN; + private String feedOutputPath = baseTestDir + "/output-data" + MINUTE_DATE_PATTERN; + private ColoHelper cluster = servers.get(0); + private OozieClient clusterOC = serverOC.get(0); + private static final Logger LOGGER = Logger.getLogger(FeedInstanceDependencyTest.class); + private String processName; + + private String startTime = "2015-06-06T09:37Z"; + private String endTime = "2015-06-06T10:37Z"; + + @BeforeClass(alwaysRun = true) + public void uploadWorkflow() throws Exception { + uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); + } + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + bundles[0] = BundleUtil.readELBundle(); + bundles[0] = new Bundle(bundles[0], cluster); + bundles[0].generateUniqueBundle(this); + bundles[0].submitClusters(prism); + bundles[0].setInputFeedDataPath(feedInputPath); + bundles[0].setOutputFeedLocationData(feedOutputPath); + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + processName = bundles[0].getProcessName(); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() { + removeTestClassEntities(); + } + + @Test(groups = { "singleCluster" }, dataProvider = "testDataProvider") + public void testData(String... processTime) throws Exception { + bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setProcessConcurrency(6); + bundles[0].setProcessPeriodicity(10, TimeUnit.minutes); + bundles[0].setInputFeedValidity("2014-01-01T01:00Z", "2016-12-12T22:00Z"); + bundles[0].setOutputFeedValidity("2014-01-01T01:00Z", "2016-12-12T22:00Z"); + bundles[0].setDatasetInstances("now(0,-20)", "now(0,20)"); + bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].submitFeedsScheduleProcess(prism); + String[] expTime = new String[processTime.length - 3]; + System.arraycopy(processTime, 3, expTime, 0, processTime.length - 3); + + List expectedTime = Arrays.asList(expTime); + + InstanceDependencyResult r = null; + if (processTime[1].equals("Input")) { + r = prism.getFeedHelper() + .getInstanceDependencies(bundles[0].getInputFeedNameFromBundle(), "?instanceTime=" + processTime[2], + null); + } + if (processTime[1].equals("Output")) { + r = prism.getFeedHelper().getInstanceDependencies(bundles[0].getOutputFeedNameFromBundle(), + "?instanceTime=" + processTime[2], null); + } + + if (processTime[0].equals("true") && r != null) { + AssertUtil.assertSucceeded(r); + InstanceUtil.assertFeedInstances(r, processName, processTime[1], expectedTime); + } else if (processTime[0].equals("emptyMessage") && r != null) { + AssertUtil.assertSucceeded(r); + } else { + AssertUtil.assertFailedInstance(r); + } + } + + @DataProvider + public static Object[][] testDataProvider() { + return new Object[][] { + new String[] { "true", "Input", "2015-06-06T09:35Z", "2015-06-06T09:37Z", "2015-06-06T09:47Z", + "2015-06-06T09:57Z", }, + new String[] { "true", "Input", "2015-06-06T09:40Z", "2015-06-06T09:37Z", "2015-06-06T09:47Z", + "2015-06-06T09:57Z", }, + new String[] { "true", "Input", "2015-06-06T09:45Z", "2015-06-06T09:37Z", "2015-06-06T09:47Z", + "2015-06-06T09:57Z", "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T09:50Z", "2015-06-06T09:37Z", "2015-06-06T09:47Z", + "2015-06-06T09:57Z", "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T10:00Z", "2015-06-06T10:17Z", "2015-06-06T09:57Z", + "2015-06-06T09:47Z", "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T10:05Z", "2015-06-06T10:17Z", "2015-06-06T09:57Z", + "2015-06-06T10:27Z", "2015-06-06T09:47Z", "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T10:10Z", "2015-06-06T10:17Z", "2015-06-06T09:57Z", + "2015-06-06T10:27Z", "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T10:15Z", "2015-06-06T10:17Z", "2015-06-06T09:57Z", + "2015-06-06T10:27Z", "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T10:20Z", "2015-06-06T10:17Z", "2015-06-06T10:27Z", + "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T10:25Z", "2015-06-06T10:17Z", "2015-06-06T10:27Z", + "2015-06-06T10:07Z", }, + new String[] { "true", "Input", "2015-06-06T10:30Z", "2015-06-06T10:17Z", "2015-06-06T10:27Z", }, + new String[] { "true", "Input", "2015-06-06T10:35Z", "2015-06-06T10:17Z", "2015-06-06T10:27Z", }, + new String[] { "true", "Input", "2015-06-06T10:40Z", "2015-06-06T10:27Z", }, + + new String[] { "true", "Output", "2015-06-06T09:35Z", "2015-06-06T09:37Z", }, + new String[] { "true", "Output", "2015-06-06T09:45Z", "2015-06-06T09:47Z", }, + new String[] { "true", "Output", "2015-06-06T09:55Z", "2015-06-06T09:57Z", }, + new String[] { "true", "Output", "2015-06-06T10:05Z", "2015-06-06T10:07Z", }, + new String[] { "true", "Output", "2015-06-06T10:15Z", "2015-06-06T10:17Z", }, + new String[] { "true", "Output", "2015-06-06T10:25Z", "2015-06-06T10:27Z", }, + + new String[] { "emptyMessage", "Output", "2015-06-06T09:40Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T09:50Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T10:00Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T10:10Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T10:20Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T10:30Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T10:35Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T10:40Z", }, + new String[] { "emptyMessage", "Output", "2015-06-06T10:45Z", }, + new String[] { "false", "Output", "2017-06-06T10:45Z", }, + new String[] { "false", "Output", "2013-06-06T10:45Z", }, + new String[] { "false", "Output", "2017-06-06T10:48Z", }, + new String[] { "false", "Output", "2013-06-06T10:51Z", }, + + new String[] { "false", "Input", "2017-06-06T10:45Z", }, + new String[] { "false", "Input", "2013-06-06T10:45Z", }, + new String[] { "false", "Input", "2017-06-06T10:48Z", }, + new String[] { "false", "Input", "2013-06-06T10:51Z", }, + + new String[] { "false", "Output", "2015-06-06T09:51Z", }, + new String[] { "false", "Input", "2015-06-06T09:51Z", }, + }; + } + + @Test(groups = { "singleCluster" }) + public void testMultipleData() throws Exception { + + bundles[0].setProcessValidity("2015-06-06T09:35Z", "2015-06-06T09:45Z"); + bundles[0].setProcessConcurrency(6); + + bundles[0].setProcessPeriodicity(10, TimeUnit.minutes); + bundles[0].setInputFeedValidity("2014-01-01T01:00Z", "2016-12-12T22:00Z"); + bundles[0].setOutputFeedValidity("2014-01-01T01:00Z", "2016-12-12T22:00Z"); + bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].submitFeedsScheduleProcess(prism); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + + ProcessMerlin processFirst = new ProcessMerlin(bundles[0].getProcessObject().toString()); + processFirst.setName("Process-producer-1"); + LOGGER.info("process : " + processFirst.toString()); + + prism.getProcessHelper().submitEntity(processFirst.toString()); + + ProcessMerlin processSecond = new ProcessMerlin(bundles[0].getProcessObject().toString()); + processSecond.setName("Process-producer-2"); + LOGGER.info("process : " + processSecond.toString()); + + prism.getProcessHelper().submitEntity(processSecond.toString()); + + InstanceDependencyResult r; + + // For Input feed + r = prism.getFeedHelper() + .getInstanceDependencies(bundles[0].getInputFeedNameFromBundle(), "?instanceTime=2015-06-06T09:45Z", + null); + AssertUtil.assertSucceeded(r); + + // For Output Feed + r = prism.getFeedHelper() + .getInstanceDependencies(bundles[0].getOutputFeedNameFromBundle(), "?instanceTime=2015-06-06T09:45Z", + null); + AssertUtil.assertSucceeded(r); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e0d5c58/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/ProcessInstanceDependencyTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/ProcessInstanceDependencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/ProcessInstanceDependencyTest.java new file mode 100644 index 0000000..e93cd36 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/ProcessInstanceDependencyTest.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.triage; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency.TimeUnit; +import org.apache.falcon.regression.Entities.ProcessMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.MatrixUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.falcon.resource.InstanceDependencyResult; +import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test Suite for process InstanceDependency corresponding to FALCON-1039. + */ +@Test(groups = "embedded") +public class ProcessInstanceDependencyTest extends BaseTestClass { + + private String baseTestDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = baseTestDir + "/aggregator"; + private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN; + private String feedOutputPath = baseTestDir + "/output-data" + MINUTE_DATE_PATTERN; + private ColoHelper cluster = servers.get(0); + private OozieClient clusterOC = serverOC.get(0); + private static final Logger LOGGER = Logger.getLogger(ProcessInstanceDependencyTest.class); + private String processName; + + private String startTime = "2015-06-06T09:37Z"; + private String endTime = "2015-06-06T10:37Z"; + + @BeforeClass(alwaysRun = true) + public void uploadWorkflow() throws Exception { + uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); + } + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + bundles[0] = BundleUtil.readELBundle(); + bundles[0] = new Bundle(bundles[0], cluster); + bundles[0].generateUniqueBundle(this); + bundles[0].submitClusters(prism); + + bundles[0].setInputFeedDataPath(feedInputPath); + bundles[0].setOutputFeedLocationData(feedOutputPath); + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + processName = bundles[0].getProcessName(); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() { + removeTestClassEntities(); + } + + @Test(groups = { "singleCluster" }, dataProvider = "getELData") + public void testData(String[] elTime, String[] param) throws Exception { + bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setProcessConcurrency(6); + bundles[0].setProcessPeriodicity(10, TimeUnit.minutes); + bundles[0].setDatasetInstances(elTime[0], elTime[1]); + bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].submitFeedsScheduleProcess(prism); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.SUCCEEDED, + EntityType.PROCESS, 5); + + InstanceDependencyResult r = prism.getProcessHelper() + .getInstanceDependencies(processName, "?instanceTime=" + param[0], null); + + if (param[1].equals("true")) { + AssertUtil.assertSucceeded(r); + InstanceUtil.assertProcessInstances(r, clusterOC, + OozieUtil.getBundles(clusterOC, processName, EntityType.PROCESS).get(0), param[0]); + } else { + AssertUtil.assertFailedInstance(r); + } + } + + @Test(groups = { "singleCluster" }, dataProvider = "getInstanceTime") + public void testProcessWithOptionalInput(String instanceTime, String flag) throws Exception { + + bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setProcessPeriodicity(10, TimeUnit.minutes); + bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].setProcessConcurrency(6); + + ProcessMerlin process = bundles[0].getProcessObject(); + process.getInputs().getInputs().get(0).setOptional(true); + + bundles[0].setProcessData(process.toString()); + bundles[0].submitFeedsScheduleProcess(); + + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, CoordinatorAction.Status.RUNNING, + EntityType.PROCESS, 5); + + InstanceDependencyResult r = prism.getProcessHelper() + .getInstanceDependencies(processName, "?instanceTime=" + instanceTime, null); + + if (flag.equals("true")) { + AssertUtil.assertSucceeded(r); + InstanceUtil.assertProcessInstances(r, clusterOC, + OozieUtil.getBundles(clusterOC, processName, EntityType.PROCESS).get(0), instanceTime); + } else { + AssertUtil.assertFailedInstance(r); + } + } + + @Test(groups = { "singleCluster" }, dataProvider = "getInstanceTime") + public void testWithMultipleProcess(String instanceTime, String flag) throws Exception { + + LOGGER.info("Time range between : " + startTime + " and " + endTime); + bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setProcessConcurrency(6); + bundles[0].setProcessPeriodicity(10, TimeUnit.minutes); + bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); + bundles[0].submitFeedsScheduleProcess(prism); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, + EntityType.PROCESS, 5); + + ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessObject().toString()); + process.setName("Process-producer-1"); + LOGGER.info("process : " + process.toString()); + + prism.getProcessHelper().submitEntity(process.toString()); + InstanceDependencyResult r = prism.getProcessHelper() + .getInstanceDependencies(processName, "?instanceTime=" + instanceTime, null); + + if (flag.equals("true")) { + AssertUtil.assertSucceeded(r); + InstanceUtil.assertProcessInstances(r, clusterOC, + OozieUtil.getBundles(clusterOC, processName, EntityType.PROCESS).get(0), instanceTime); + } else { + AssertUtil.assertFailedInstance(r); + } + } + + @DataProvider + public Object[][] getInstanceTime() { + return new Object[][] { { startTime, "true" }, + { TimeUtil.addMinsToTime(startTime, 10), "true" }, + { TimeUtil.addMinsToTime(startTime, 20), "true" }, + { TimeUtil.addMinsToTime(startTime, 30), "true" }, + { TimeUtil.addMinsToTime(startTime, 40), "true" }, + { TimeUtil.addMinsToTime(startTime, 50), "true" }, + { TimeUtil.addMinsToTime(startTime, 60), "false" }, + { TimeUtil.addMinsToTime(startTime, 80), "false" }, + { TimeUtil.addMinsToTime(startTime, -10), "false" }, + { TimeUtil.addMinsToTime(startTime, 25), "false" }, }; + } + + @DataProvider + public Object[][] getELData() { + String[][] elData = new String[][] { { "now(0,-30)", "now(0,30)" }, { "today(0,0)", "now(0,30)" }, }; + String[][] timeHelper = new String[][] { { startTime, "true" }, + { TimeUtil.addMinsToTime(startTime, 10), "true" }, + { TimeUtil.addMinsToTime(startTime, 20), "true" }, + { TimeUtil.addMinsToTime(startTime, 30), "true" }, + { TimeUtil.addMinsToTime(startTime, 40), "true" }, + { TimeUtil.addMinsToTime(startTime, 50), "true" }, + { TimeUtil.addMinsToTime(startTime, 60), "false" }, + { TimeUtil.addMinsToTime(startTime, 80), "false" }, + { TimeUtil.addMinsToTime(startTime, -10), "false" }, + { TimeUtil.addMinsToTime(startTime, 25), "false" }, }; + return MatrixUtil.crossProductNew(elData, timeHelper); + } +}