falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1260 Instance dependency API produces incorrect results. Contributed by Ajay Yadava.
Date Mon, 29 Jun 2015 08:13:25 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 53bd6c38e -> bd4434333


FALCON-1260 Instance dependency API produces incorrect results. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bd443433
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bd443433
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bd443433

Branch: refs/heads/master
Commit: bd44343333e166e672a62c15c5ef306fadc46dd7
Parents: 53bd6c3
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Mon Jun 29 12:49:43 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Mon Jun 29 13:42:33 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../resource/SchedulableEntityInstance.java     |  18 +-
 .../org/apache/falcon/entity/EntityUtil.java    |  16 +-
 .../org/apache/falcon/entity/FeedHelper.java    |  38 +-
 .../org/apache/falcon/entity/ProcessHelper.java |  10 +-
 .../apache/falcon/entity/FeedHelperTest.java    | 349 +++++++++++++++++--
 .../apache/falcon/entity/ProcessHelperTest.java | 100 ++++--
 docs/src/site/twiki/FalconCLI.twiki             |  10 +-
 .../resource/AbstractInstanceManager.java       |   1 +
 .../resource/proxy/InstanceManagerProxy.java    |   7 +-
 10 files changed, 458 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e81af1e..2928497 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,8 @@ Trunk (Unreleased)
     (Suhas Vasu)
 
   BUG FIXES
+    FALCON-1260 Instance dependency API produces incorrect results (Ajay Yadava)
+    
     FALCON-99 Adding late data to process doesn't create new coord (Pallavi Rao via Suhas Vasu)
 
     FALCON-1101 Cluster submission in falcon does not create an owned-by edge(Sowmya Ramesh via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
index 2a7ecdb..f5be63d 100644
--- a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
+++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
@@ -40,7 +40,7 @@ public class SchedulableEntityInstance {
 
     private EntityType entityType;
 
-    private String tag;
+    private String tags;
 
     //for JAXB
     private SchedulableEntityInstance() {
@@ -56,12 +56,12 @@ public class SchedulableEntityInstance {
         }
     }
 
-    public String getTag() {
-        return tag;
+    public String getTags() {
+        return tags;
     }
 
-    public void setTag(String tag) {
-        this.tag = tag;
+    public void setTags(String tags) {
+        this.tags = tags;
     }
 
     public String getEntityName() {
@@ -103,7 +103,7 @@ public class SchedulableEntityInstance {
                 + ", type: " + entityType
                 + ", cluster: " + cluster
                 + ", instanceTime: " + SchemaHelper.formatDateUTC(instanceTime));
-        sb.append(", tag: " + ((tag != null) ? tag : ""));
+        sb.append(", tags: " + ((tags != null) ? tags : ""));
         return sb.toString();
     }
 
@@ -134,7 +134,7 @@ public class SchedulableEntityInstance {
             return false;
         }
 
-        if (!StringUtils.equals(tag, that.tag)) {
+        if (!StringUtils.equals(tags, that.tags)) {
             return false;
         }
 
@@ -147,8 +147,8 @@ public class SchedulableEntityInstance {
         result = 31 * result + entityName.hashCode();
         result = 31 * result + entityType.hashCode();
         result = 31 * result + cluster.hashCode();
-        if (tag != null) {
-            result = 31 * result + tag.hashCode();
+        if (tags != null) {
+            result = 31 * result + tags.hashCode();
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index b86d9d7..63dfb9d 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -803,12 +803,12 @@ public final class EntityUtil {
         Calendar insCal = Calendar.getInstance(tz);
         insCal.setTime(startTime);
 
-        int instanceCount = getInstanceSequence(startTime, frequency, tz, referenceTime);
+        int instanceCount = getInstanceSequence(startTime, frequency, tz, referenceTime) - 1;
         final int freq = frequency.getFrequencyAsInt() * instanceCount;
         insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
 
         while (insCal.getTime().after(referenceTime)) {
-            insCal.add(frequency.getTimeUnit().getCalendarUnit(), -1);
+            insCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt() * -1);
         }
         return insCal.getTime();
     }
@@ -849,7 +849,7 @@ public final class EntityUtil {
     /**
      * Find instance times given first instance start time and frequency till a given end time.
      *
-     * It finds the first valid instance time in the given time range, it then uses frequency to find next instances
+     * It finds the first valid instance time for the given time range, it then uses frequency to find next instances
      * in the given time range.
      *
      * @param startTime startTime of the entity (time of first instance ever of the given entity)
@@ -866,15 +866,15 @@ public final class EntityUtil {
             timeZone = TimeZone.getTimeZone("UTC");
         }
 
-        while(true){
-            Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, startRange);
-            if (nextStartTime.before(startRange) || nextStartTime.after(endRange)){
+        Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange);
+        while (true) {
+            Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, current);
+            if (nextStartTime.after(endRange)){
                 break;
             }
-
             result.add(nextStartTime);
             // this is required because getNextStartTime returns greater than or equal to referenceTime
-            startRange = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later
+            current = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 9f4eb61..eadc8d6 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -448,7 +448,7 @@ public final class FeedHelper {
 
         // validate that instanceTime is in validity range
         if (feedCluster.getValidity().getStart().after(instanceTime)
-                || feedCluster.getValidity().getEnd().before(instanceTime)) {
+                || !feedCluster.getValidity().getEnd().after(instanceTime)) {
             throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for"
                     + " Feed: " + feed.getName() + " on cluster:" + cluster.getName());
         }
@@ -479,14 +479,20 @@ public final class FeedHelper {
 
         //validate the inputs
         validateFeedInstance(feed, feedInstanceTime, cluster);
-
         Process process = getProducerProcess(feed);
         if (process != null) {
+            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process,
+                    cluster.getName());
+            Date pStart = processCluster.getValidity().getStart();
+            Date pEnd = processCluster.getValidity().getEnd();
             try {
                 Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster);
+                if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd)) {
+                    return null;
+                }
                 SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
                         processInstanceTime, EntityType.PROCESS);
-                producer.setTag(SchedulableEntityInstance.OUTPUT);
+                producer.setTags(SchedulableEntityInstance.OUTPUT);
                 return producer;
             } catch (FalconException e) {
                 LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } "
@@ -576,7 +582,7 @@ public final class FeedHelper {
             for (Date date : consumerInstanceTimes) {
                 SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date,
                         EntityType.PROCESS);
-                in.setTag(SchedulableEntityInstance.INPUT);
+                in.setTags(SchedulableEntityInstance.INPUT);
                 result.add(in);
             }
         }
@@ -645,7 +651,7 @@ public final class FeedHelper {
             ExpressionHelper.setReferenceDate(processStartDate);
             ExpressionHelper evaluator = ExpressionHelper.get();
             Date startRelative = evaluator.evaluate(in.getStart(), Date.class);
-            Date startTimeActual = EntityUtil.getNextStartTime(feedStartDate,
+            Date startTimeActual = EntityUtil.getPreviousInstanceTime(feedStartDate,
                     feed.getFrequency(), feed.getTimezone(), startRelative);
             Long offset = processStartDate.getTime() - startTimeActual.getTime();
 
@@ -663,10 +669,15 @@ public final class FeedHelper {
 
                 ExpressionHelper.setReferenceDate(nextConsumerInstance);
                 evaluator = ExpressionHelper.get();
-                Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime();
+                Date inputStart = evaluator.evaluate(in.getStart(), Date.class);
+                Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(),
+                        feed.getTimezone(), inputStart).getTime();
                 Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
-                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) {
-                    result.add(nextConsumerInstance);
+                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) {
+                    if (!nextConsumerInstance.before(processCluster.getValidity().getStart())
+                            && nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
+                        result.add(nextConsumerInstance);
+                    }
                 } else {
                     break;
                 }
@@ -681,10 +692,15 @@ public final class FeedHelper {
 
                 ExpressionHelper.setReferenceDate(nextConsumerInstance);
                 evaluator = ExpressionHelper.get();
-                Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime();
+                Date inputStart = evaluator.evaluate(in.getStart(), Date.class);
+                Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(),
+                        feed.getTimezone(), inputStart).getTime();
                 Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
-                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) {
-                    result.add(nextConsumerInstance);
+                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) {
+                    if (!nextConsumerInstance.before(processCluster.getValidity().getStart())
+                            && nextConsumerInstance.before(processCluster.getValidity().getEnd())) {
+                        result.add(nextConsumerInstance);
+                    }
                 } else {
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index fe78bc8..bbfca68 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -98,7 +98,7 @@ public final class ProcessHelper {
 
         // check if instanceTime is in validity range
         if (instanceTime.before(processCluster.getValidity().getStart())
-                || instanceTime.after(processCluster.getValidity().getEnd())) {
+                || !instanceTime.before(processCluster.getValidity().getEnd())) {
             throw new IllegalArgumentException("Instance time provided: " + instanceTime
                     + " is not in validity range of process: " + process.getName()
                     + "on cluster: " + cluster.getName());
@@ -148,7 +148,7 @@ public final class ProcessHelper {
                 SchedulableEntityInstance instance;
                 for (Date time : instanceTimes) {
                     instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), time, EntityType.FEED);
-                    instance.setTag(SchedulableEntityInstance.INPUT);
+                    instance.setTags(SchedulableEntityInstance.INPUT);
                     result.add(instance);
                 }
             }
@@ -175,11 +175,11 @@ public final class ProcessHelper {
                 // find the feed
                 Feed feed = store.get(EntityType.FEED, output.getFeed());
                 org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, cluster.getName());
-                outputInstance = EntityUtil.getNextStartTime(fCluster.getValidity().getStart(), feed.getFrequency(),
-                        feed.getTimezone(), outputInstance);
+                outputInstance = EntityUtil.getPreviousInstanceTime(fCluster.getValidity().getStart(),
+                        feed.getFrequency(), feed.getTimezone(), outputInstance);
                 candidate = new SchedulableEntityInstance(output.getFeed(), cluster.getName(), outputInstance,
                         EntityType.FEED);
-                candidate.setTag(SchedulableEntityInstance.OUTPUT);
+                candidate.setTags(SchedulableEntityInstance.OUTPUT);
                 result.add(candidate);
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index f70edfb..b15f023 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -78,6 +78,155 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, null), "");
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testInstanceBeforeStart() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("now(0,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+        FeedHelper.getProducerInstance(feed, getDate("2011-02-27 10:00 UTC"), cluster);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testInstanceEqualsEnd() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("now(0,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+        FeedHelper.getProducerInstance(feed, getDate("2016-02-28 10:00 UTC"), cluster);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testInstanceOutOfSync() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("now(0,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+        FeedHelper.getProducerInstance(feed, getDate("2016-02-28 09:04 UTC"), cluster);
+    }
+
+    @Test
+    public void testGetProducerOutOfValidity() throws FalconException, ParseException {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("now(0,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"),
+                cluster);
+        Assert.assertNull(result);
+    }
+
+    @Test
+    public void testGetConsumersOutOfValidity() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(0, -20)");
+        inFeed.setEnd("now(0, 0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"),
+                cluster);
+        Assert.assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testGetConsumersFirstInstance() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(0, -20)");
+        inFeed.setEnd("now(0, 0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"),
+                cluster);
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2012-02-28 10:37 UTC"), EntityType.PROCESS);
+        consumer.setTags(SchedulableEntityInstance.INPUT);
+        expected.add(consumer);
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetConsumersLastInstance() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:20 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(0, -20)");
+        inFeed.setEnd("now(0, 0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"),
+                cluster);
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        String[] consumers = { "2012-02-28 10:20 UTC", "2012-02-28 10:30 UTC", };
+        for (String d : consumers) {
+            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                    getDate(d), EntityType.PROCESS);
+            i.setTags(SchedulableEntityInstance.INPUT);
+            expected.add(i);
+        }
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testFeedWithNoDependencies() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"),
+                cluster);
+        Assert.assertTrue(result.isEmpty());
+        SchedulableEntityInstance res = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"),
+                cluster);
+        Assert.assertNull(res);
+    }
+
     @Test
     public void testEvaluateExpression() throws Exception {
         Cluster cluster = new Cluster();
@@ -144,29 +293,24 @@ public class FeedHelperTest extends AbstractTestBase {
 
     @Test
     public void testGetProducerProcessWithOffset() throws FalconException, ParseException {
-        //create a feed, submit it, test that ProducerProcess is null
-
         Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
         Assert.assertNull(FeedHelper.getProducerProcess(feed));
-
-        // create it's producer process submit it, test it's ProducerProcess
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2016-02-28 10:37 UTC");
         Outputs outputs = new Outputs();
         Output outFeed = new Output();
         outFeed.setName("outputFeed");
         outFeed.setFeed(feed.getName());
-        outFeed.setInstance("today(0,0)");
+        outFeed.setInstance("now(0,0)");
         outputs.getOutputs().add(outFeed);
         process.setOutputs(outputs);
         store.publish(EntityType.PROCESS, process);
-
         Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
-        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"),
+        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:35 UTC"),
                 cluster);
         SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
-                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTag(SchedulableEntityInstance.OUTPUT);
+                getDate("2013-02-28 10:37 UTC"), EntityType.PROCESS);
+        expected.setTags(SchedulableEntityInstance.OUTPUT);
         Assert.assertEquals(result, expected);
     }
 
@@ -192,7 +336,7 @@ public class FeedHelperTest extends AbstractTestBase {
                 cluster);
         SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
                 getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        expected.setTags(SchedulableEntityInstance.OUTPUT);
         Assert.assertEquals(result, expected);
     }
 
@@ -218,7 +362,7 @@ public class FeedHelperTest extends AbstractTestBase {
                 cluster);
         SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
                 getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        expected.setTags(SchedulableEntityInstance.OUTPUT);
         Assert.assertEquals(result, expected);
     }
 
@@ -245,7 +389,7 @@ public class FeedHelperTest extends AbstractTestBase {
                 cluster);
         SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
                 getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        expected.setTags(SchedulableEntityInstance.OUTPUT);
         Assert.assertEquals(result, expected);
     }
 
@@ -270,7 +414,7 @@ public class FeedHelperTest extends AbstractTestBase {
                 cluster);
         SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
                 getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
-        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        expected.setTags(SchedulableEntityInstance.OUTPUT);
         Assert.assertEquals(result, expected);
     }
 
@@ -322,19 +466,99 @@ public class FeedHelperTest extends AbstractTestBase {
         Set<SchedulableEntityInstance> expected = new HashSet<>();
         SchedulableEntityInstance ins = new SchedulableEntityInstance(process.getName(), cluster.getName(),
                 getDate("2012-02-28 10:00 UTC"), EntityType.PROCESS);
-        ins.setTag(SchedulableEntityInstance.INPUT);
+        ins.setTags(SchedulableEntityInstance.INPUT);
         expected.add(ins);
         Assert.assertEquals(result, expected);
+    }
 
+    @Test
+    public void testGetConsumerProcessInstancesWithNonUnitFrequency() throws Exception {
+        //create a feed, submit it, test that ConsumerProcesses is blank list
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(0, -20)");
+        inFeed.setEnd("now(0,0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2012-02-28 09:40 UTC"), cluster);
+
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        String[] consumers = {"2012-02-28 09:47 UTC", "2012-02-28 09:57 UTC"};
+        for (String d : consumers) {
+            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                    getDate(d), EntityType.PROCESS);
+            i.setTags(SchedulableEntityInstance.INPUT);
+            expected.add(i);
+        }
+        Assert.assertEquals(result, expected);
     }
 
     @Test
-    public void testGetMultipleConsumerInstances() throws Exception {
+    public void testGetConsumersOutOfValidityRange() throws Exception {
+        //create a feed, submit it, test that ConsumerProcesses is blank list
         Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+        Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(0, -20)");
+        inFeed.setEnd("now(0,0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2010-02-28 09:40 UTC"), cluster);
+        Assert.assertEquals(result.size(), 0);
+    }
+
+    @Test
+    public void testGetConsumersLargeOffsetShortValidity() throws Exception {
+        //create a feed, submit it, test that ConsumerProcesses is blank list
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
 
         //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2012-02-28 09:47 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("today(-2, 0)");
+        inFeed.setEnd("now(0,0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2012-02-28 09:35 UTC"), cluster);
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2012-02-28 09:37 UTC"), EntityType.PROCESS);
+        consumer.setTags(SchedulableEntityInstance.INPUT);
+        expected.add(consumer);
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetMultipleConsumerInstances() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
+        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
         Inputs inputs = new Inputs();
         Input inFeed = new Input();
         inFeed.setName("inputFeed");
@@ -347,28 +571,27 @@ public class FeedHelperTest extends AbstractTestBase {
 
         Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
                 getDate("2012-02-28 09:00 UTC"), cluster);
-        Assert.assertEquals(result.size(), 8);
-
+        Assert.assertEquals(result.size(), 9);
         Set<SchedulableEntityInstance> expected = new HashSet<>();
         String[] consumers = { "2012-02-28 05:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 07:00 UTC",
             "2012-02-28 08:00 UTC", "2012-02-28 09:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 11:00 UTC",
-            "2012-02-28 12:00 UTC", };
+            "2012-02-28 12:00 UTC", "2012-02-28 13:00 UTC", };
         for (String d : consumers) {
             SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
                     getDate(d), EntityType.PROCESS);
-            i.setTag(SchedulableEntityInstance.INPUT);
+            i.setTags(SchedulableEntityInstance.INPUT);
             expected.add(i);
         }
         Assert.assertEquals(result, expected);
     }
 
     @Test
-    public void testGetConsumerWithNow() throws Exception {
+    public void testGetConsumerWithVariableEnd() throws Exception {
         Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
 
         //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
         Inputs inputs = new Inputs();
         Input inFeed = new Input();
         inFeed.setName("inputFeed");
@@ -378,19 +601,66 @@ public class FeedHelperTest extends AbstractTestBase {
         inputs.getInputs().add(inFeed);
         process.setInputs(inputs);
         store.publish(EntityType.PROCESS, process);
-
         Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
                 getDate("2012-02-28 00:00 UTC"), cluster);
-        Assert.assertEquals(result.size(), 23);
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        String[] consumers =  {"2012-02-28 11:00 UTC", "2012-02-28 16:00 UTC", "2012-02-28 18:00 UTC",
+            "2012-02-28 20:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 04:00 UTC",
+            "2012-02-28 06:00 UTC", "2012-02-28 05:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 00:00 UTC",
+            "2012-02-28 23:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 22:00 UTC",
+            "2012-02-28 14:00 UTC", "2012-02-28 08:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 02:00 UTC",
+            "2012-02-28 01:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 09:00 UTC",
+            "2012-02-28 07:00 UTC", };
+        for (String d : consumers) {
+            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                    getDate(d), EntityType.PROCESS);
+            i.setTags(SchedulableEntityInstance.INPUT);
+            expected.add(i);
+        }
+        Assert.assertEquals(result, expected);
     }
 
     @Test
-    public void testGetConsumerWithLatest() throws Exception {
+    public void testGetConsumerWithVariableStart() throws Exception {
         Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
 
         //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
-        Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(0, 0)");
+        inFeed.setEnd("today(24, 0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2012-03-28 00:00 UTC"), cluster);
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        String[] consumers =  {"2012-03-27 16:00 UTC", "2012-03-27 01:00 UTC", "2012-03-27 10:00 UTC",
+            "2012-03-27 03:00 UTC", "2012-03-27 08:00 UTC", "2012-03-27 07:00 UTC", "2012-03-27 19:00 UTC",
+            "2012-03-27 22:00 UTC", "2012-03-27 12:00 UTC", "2012-03-27 20:00 UTC", "2012-03-27 09:00 UTC",
+            "2012-03-27 04:00 UTC", "2012-03-27 14:00 UTC", "2012-03-27 05:00 UTC", "2012-03-27 23:00 UTC",
+            "2012-03-27 17:00 UTC", "2012-03-27 13:00 UTC", "2012-03-27 18:00 UTC", "2012-03-27 15:00 UTC",
+            "2012-03-28 00:00 UTC", "2012-03-27 02:00 UTC", "2012-03-27 11:00 UTC", "2012-03-27 21:00 UTC",
+            "2012-03-27 00:00 UTC", "2012-03-27 06:00 UTC", };
+        for (String d : consumers) {
+            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                    getDate(d), EntityType.PROCESS);
+            i.setTags(SchedulableEntityInstance.INPUT);
+            expected.add(i);
+        }
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetConsumerWithLatest() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC");
+        Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
         Inputs inputs = new Inputs();
         Input inFeed = new Input();
         inFeed.setName("inputFeed");
@@ -403,8 +673,21 @@ public class FeedHelperTest extends AbstractTestBase {
 
         Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
                 getDate("2012-02-28 00:00 UTC"), cluster);
-        System.out.println("result.size() = " + result.size());
-        Assert.assertEquals(result.size(), 23);
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        String[] consumers =  {"2012-02-28 23:00 UTC", "2012-02-28 04:00 UTC", "2012-02-28 10:00 UTC",
+            "2012-02-28 07:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 05:00 UTC",
+            "2012-02-28 22:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 11:00 UTC",
+            "2012-02-28 20:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 01:00 UTC", "2012-02-28 14:00 UTC",
+            "2012-02-28 00:00 UTC", "2012-02-28 18:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 16:00 UTC",
+            "2012-02-28 09:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 08:00 UTC",
+            "2012-02-28 02:00 UTC", };
+        for (String d : consumers) {
+            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                    getDate(d), EntityType.PROCESS);
+            i.setTags(SchedulableEntityInstance.INPUT);
+            expected.add(i);
+        }
+        Assert.assertEquals(result, expected);
     }
 
     private Validity getFeedValidity(String start, String end) throws ParseException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
index 0d396ae..0729f15 100644
--- a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
@@ -62,40 +62,98 @@ public class ProcessHelperTest extends AbstractTestBase {
         store = ConfigurationStore.get();
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testBeforeStartInstance() throws FalconException, ParseException {
+        // create a process with input feeds
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
+
+        // find the input Feed instances time
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Inputs inputs = new Inputs();
+        Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false);
+        inputs.getInputs().add(input);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Date processInstanceDate = getDate("2012-02-28 10:27 UTC");
+        ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testEqualsToEndInstance() throws FalconException, ParseException {
+        // create a process with input feeds
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
+
+        // find the input Feed instances time
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Inputs inputs = new Inputs();
+        Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false);
+        inputs.getInputs().add(input);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Date processInstanceDate = getDate("2012-02-28 10:47 UTC");
+        ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testOutOfSyncInstance() throws FalconException, ParseException {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        Inputs inputs = new Inputs();
+        Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false);
+        inputs.getInputs().add(input);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+        Date processInstanceDate = getDate("2012-02-28 10:40 UTC");
+        ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false);
+    }
+
+    @Test
+    public void testProcessWithNoDependencies() throws Exception {
+        Cluster cluster = publishCluster();
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC");
+        store.publish(EntityType.PROCESS, process);
+        Date processInstanceDate = getDate("2012-02-28 10:37 UTC");
+        Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process,
+            processInstanceDate, cluster, false);
+        Assert.assertTrue(inputFeedInstances.isEmpty());
+        Set<SchedulableEntityInstance> res = ProcessHelper.getOutputFeedInstances(process, processInstanceDate,
+            cluster);
+        Assert.assertTrue(res.isEmpty());
+    }
+
     @Test
     public void testGetInputFeedInstances() throws FalconException, ParseException {
         // create a process with input feeds
         Cluster cluster = publishCluster();
-        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
+        Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
 
         // find the input Feed instances time
-        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2016-02-28 10:37 UTC");
         Inputs inputs = new Inputs();
-        Input input = getInput("inputFeed", feed.getName(), "today(0,-30)", "today(2,30)", false);
+        Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false);
         inputs.getInputs().add(input);
         process.setInputs(inputs);
         store.publish(EntityType.PROCESS, process);
 
-        Date processInstanceDate = getDate("2012-02-28 10:00 UTC");
+        Date processInstanceDate = getDate("2012-02-28 10:37 UTC");
         Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process,
                 processInstanceDate, cluster, false);
-        Assert.assertEquals(inputFeedInstances.size(), 3);
+        Assert.assertEquals(inputFeedInstances.size(), 5);
 
         Set<SchedulableEntityInstance> expectedInputFeedInstances = new HashSet<>();
-        SchedulableEntityInstance instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(),
-                getDate("2012-02-28 00:00 UTC"), EntityType.FEED);
-        instance.setTag(SchedulableEntityInstance.INPUT);
-        expectedInputFeedInstances.add(instance);
-        instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 01:00 UTC"),
-                EntityType.FEED);
-        instance.setTag(SchedulableEntityInstance.INPUT);
-        expectedInputFeedInstances.add(instance);
-        instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 02:00 UTC"),
-                EntityType.FEED);
-        instance.setTag(SchedulableEntityInstance.INPUT);
-        expectedInputFeedInstances.add(instance);
-
-        //Validate with expected result
+        String[] inputInstances = { "2012-02-28 10:15 UTC", "2012-02-28 10:20 UTC", "2012-02-28 10:25 UTC",
+            "2012-02-28 10:30 UTC", "2012-02-28 10:35 UTC", };
+        for (String d : inputInstances) {
+            SchedulableEntityInstance i = new SchedulableEntityInstance(feed.getName(), cluster.getName(),
+                    getDate(d), EntityType.FEED);
+            i.setTags(SchedulableEntityInstance.INPUT);
+            expectedInputFeedInstances.add(i);
+        }
         Assert.assertTrue(inputFeedInstances.equals(expectedInputFeedInstances));
     }
 
@@ -115,8 +173,8 @@ public class ProcessHelperTest extends AbstractTestBase {
 
         Set<SchedulableEntityInstance> expected = new HashSet<>();
         SchedulableEntityInstance ins = new SchedulableEntityInstance(feed.getName(), cluster.getName(),
-                getDate("2012-02-28 11:00 UTC"), EntityType.FEED);
-        ins.setTag(SchedulableEntityInstance.OUTPUT);
+                getDate("2012-02-27 11:00 UTC"), EntityType.FEED);
+        ins.setTags(SchedulableEntityInstance.OUTPUT);
         expected.add(ins);
 
         Assert.assertEquals(result, expected);

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 50dce84..233e4a6 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -301,11 +301,11 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -
 
 For example:
 $FALCON_HOME/bin/falcon instance -dependency -type feed -name out -instanceTime 2014-12-15T00:00Z
-name: producer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:00Z, tag: Output
-name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:03Z, tag: Input
-name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:04Z, tag: Input
-name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:02Z, tag: Input
-name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:05Z, tag: Input
+name: producer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:00Z, tags: Output
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:03Z, tags: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:04Z, tags: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:02Z, tags: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:05Z, tags: Input
 
 
 Response: default/Success!

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 72f9fe4..d114d8d 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -227,6 +227,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
             }
 
         } catch (Throwable throwable) {
+            LOG.error("Failed to get instance dependencies:", throwable);
             throw FalconWebException.newInstanceException(throwable, Response.Status.BAD_REQUEST);
         }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/bd443433/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 1a8396c..757fda8 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -32,6 +32,8 @@ import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.DefaultValue;
@@ -55,6 +57,8 @@ import java.util.Set;
  */
 @Path("instance")
 public class InstanceManagerProxy extends AbstractInstanceManager {
+    private static final Logger LOG = LoggerFactory.getLogger(InstanceManagerProxy.class);
+
     private final Map<String, Channel> processInstanceManagerChannels = new HashMap<String, Channel>();
 
     public InstanceManagerProxy() {
@@ -389,7 +393,8 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
                 try {
                     T resultHolder = doExecute(colo);
                     results.put(colo, resultHolder);
-                } catch (FalconException e) {
+                } catch (Throwable e) {
+                    LOG.error("Failed to fetch results for colo:{}", colo, e);
                     results.put(colo, getResultInstance(APIResult.Status.FAILED,
                             e.getClass().getName() + "::" + e.getMessage()));
                 }


Mime
View raw message