Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 56CBF1887F for ; Mon, 13 Jul 2015 06:50:17 +0000 (UTC) Received: (qmail 35546 invoked by uid 500); 13 Jul 2015 06:50:17 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 35504 invoked by uid 500); 13 Jul 2015 06:50:17 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 35495 invoked by uid 99); 13 Jul 2015 06:50:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jul 2015 06:50:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E703BDFDAC; Mon, 13 Jul 2015 06:50:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ajayyadava@apache.org To: commits@falcon.apache.org Message-Id: <8a390186b821469ebc7a54582679fe09@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1311 Instance dependency API produces inconsistent results in some scenarios. Contributed by Pragya Mittal Date: Mon, 13 Jul 2015 06:50:16 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master e5698fad3 -> b36a82394 FALCON-1311 Instance dependency API produces inconsistent results in some scenarios. Contributed by Pragya Mittal Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b36a8239 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b36a8239 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b36a8239 Branch: refs/heads/master Commit: b36a82394998697934f282be22714222e4b2a39d Parents: e5698fa Author: Ajay Yadava Authored: Mon Jul 13 12:19:53 2015 +0530 Committer: Ajay Yadava Committed: Mon Jul 13 12:19:53 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../java/org/apache/falcon/entity/EntityUtil.java | 17 +++++++++++++++++ .../java/org/apache/falcon/entity/FeedHelper.java | 12 +++++++++--- .../org/apache/falcon/entity/FeedHelperTest.java | 16 ++++++++++++++++ 4 files changed, 44 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 132e064..e844a60 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -53,6 +53,8 @@ Trunk (Unreleased) (Suhas Vasu) BUG FIXES + FALCON-1311 Instance dependency API produces inconsistent results in some scenarios(Pragya Mittal via Ajay Yadava) + FALCON-1268 Instance Dependency API failure message is not intuitive in distributed mode (Ajay Yadava) FALCON-1260 Instance dependency API produces incorrect results (Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 63dfb9d..25d9008 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -259,6 +259,23 @@ public final class EntityUtil { return feed.getTimezone(); } + /** + * Returns true if the given instanceTime is a valid instanceTime on the basis of startTime and frequency of an + * entity. + * + * It doesn't check the instanceTime being after the validity of entity. + * @param startTime startTime of the entity + * @param frequency frequency of the entity. + * @param timezone timezone of the entity. + * @param instanceTime instanceTime to be checked for validity + * @return + */ + public static boolean isValidInstanceTime(Date startTime, Frequency frequency, TimeZone timezone, + Date instanceTime) { + Date next = getNextStartTime(startTime, frequency, timezone, instanceTime); + return next.equals(instanceTime); + } + public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date referenceTime) { if (startTime.after(referenceTime)) { return startTime; http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index eadc8d6..bb31de8 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -26,6 +26,7 @@ import org.apache.falcon.Tag; import org.apache.falcon.entity.common.FeedDataPath; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Cluster; @@ -475,7 +476,7 @@ public final class FeedHelper { * @return returns the instance of the process which produces the given feed */ public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime, - org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { + org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { //validate the inputs validateFeedInstance(feed, feedInstanceTime, cluster); @@ -485,16 +486,21 @@ public final class FeedHelper { cluster.getName()); Date pStart = processCluster.getValidity().getStart(); Date pEnd = processCluster.getValidity().getEnd(); + Frequency pFrequency = process.getFrequency(); + TimeZone pTz = process.getTimezone(); + try { Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster); - if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd)) { + boolean isValid = EntityUtil.isValidInstanceTime(pStart, pFrequency, pTz, processInstanceTime); + if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd) || !isValid){ return null; } + SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(), processInstanceTime, EntityType.PROCESS); producer.setTags(SchedulableEntityInstance.OUTPUT); return producer; - } catch (FalconException e) { + } catch (FalconException | IllegalArgumentException e) { LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } " + " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/b36a8239/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index b15f023..c70cfcc 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -127,6 +127,22 @@ public class FeedHelperTest extends AbstractTestBase { } @Test + public void testInvalidProducerInstance() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); + Outputs outputs = new Outputs(); + Output outFeed = new Output(); + outFeed.setName("outputFeed"); + outFeed.setFeed(feed.getName()); + outFeed.setInstance("now(0,0)"); + outputs.getOutputs().add(outFeed); + process.setOutputs(outputs); + store.publish(EntityType.PROCESS, process); + Assert.assertNull(FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:40 UTC"), cluster)); + } + + @Test public void testGetProducerOutOfValidity() throws FalconException, ParseException { Cluster cluster = publishCluster(); Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");