falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [1/2] falcon git commit: FALCON-1516 Feed Retention support in Falcon Unit (Pavan Kolamuri)
Date Mon, 12 Oct 2015 06:18:47 GMT
Repository: falcon
Updated Branches:
  refs/heads/master ce59dc2ea -> 2132a983a


FALCON-1516 Feed Retention support in Falcon Unit (Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2f867bc2
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2f867bc2
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2f867bc2

Branch: refs/heads/master
Commit: 2f867bc2fdfa9f29f7959cafe3e9050adf9c0975
Parents: e3893a8
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Mon Oct 12 11:44:26 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Oct 12 11:44:26 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/client/AbstractFalconClient.java     | 30 ++++++++++++
 .../org/apache/falcon/client/FalconClient.java  | 10 ++--
 .../java/org/apache/falcon/util/DateUtil.java   | 11 ++++-
 .../resource/AbstractInstanceManager.java       |  6 +--
 .../apache/falcon/unit/FalconUnitClient.java    | 17 +++++++
 .../falcon/unit/LocalInstanceManager.java       | 43 +++++++++++++++++
 .../unit/LocalSchedulableEntityManager.java     | 27 +++++++++++
 .../apache/falcon/unit/FalconUnitTestBase.java  | 51 +++++++++++++++++++-
 .../org/apache/falcon/unit/TestFalconUnit.java  | 31 ++++++++++++
 unit/src/test/resources/infeed.xml              |  2 +-
 11 files changed, 217 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ac9fb8..35cc484 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao) 
+
     FALCON-1231 Improve JobCompletionNotification Service(Pallavi Rao)
 
     FALCON-1157 Build error when using maven 3.3.x(Venkat Ramachandran via Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 265e08c..2358289 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -17,10 +17,13 @@
  */
 package org.apache.falcon.client;
 
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Abstract Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs
@@ -51,4 +54,31 @@ public abstract class AbstractFalconClient {
     public abstract APIResult schedule(EntityType entityType, String entityName, String colo,
Boolean skipDryRun,
                                         String doAsuser, String properties) throws FalconCLIException;
 
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+
+    /**
+     *
+     * @param type entity type
+     * @param entity entity name
+     * @param start start time
+     * @param end end time
+     * @param colo colo name
+     * @param lifeCycles lifecycle of an entity (for ex : feed has replication,eviction).
+     * @param filterBy filter operation can be applied to results
+     * @param orderBy
+     * @param sortOrder sort order can be asc or desc
+     * @param offset offset while displaying results
+     * @param numResults num of Results to output
+     * @param doAsUser
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract InstancesResult getStatusOfInstances(String type, String entity,
+                                                         String start, String end,
+                                                         String colo, List<LifeCycle>
lifeCycles, String filterBy,
+                                                         String orderBy, String sortOrder,
+                                                         Integer offset, Integer numResults,
+                                                         String doAsUser) throws FalconCLIException;
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 6c3a7a4..27510f6 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -576,12 +576,10 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     public InstancesResult getLogsOfInstances(String type, String entity, String start,
-                                     String end, String colo, String runId,
-                                     List<LifeCycle> lifeCycles, String filterBy,
-                                     String orderBy, String sortOrder, Integer offset,
-                                     Integer numResults, String doAsUser)
-        throws FalconCLIException {
-
+                                              String end, String colo, String runId,
+                                              List<LifeCycle> lifeCycles, String filterBy,
+                                              String orderBy, String sortOrder, Integer offset,
+                                              Integer numResults, String doAsUser) throws
FalconCLIException {
         return sendInstanceRequest(Instances.LOG, type, entity, start, end,
                 null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults,
doAsUser)
                 .getEntity(InstancesResult.class);

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java
index f89ef64..82163cc 100644
--- a/common/src/main/java/org/apache/falcon/util/DateUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java
@@ -15,9 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.falcon.util;
 
+import org.apache.falcon.entity.v0.SchemaHelper;
+
 import java.util.Calendar;
 import java.util.Date;
 import java.util.TimeZone;
@@ -30,14 +31,20 @@ public final class DateUtil {
     //Friday, April 16, 9999 7:12:55 AM UTC corresponding date
     public static final Date NEVER = new Date(Long.parseLong("253379862775000"));
 
+    public static final long HOUR_IN_MILLIS = 60 * 60 * 1000;
+
     private DateUtil() {}
 
     public static Date getNextMinute(Date time) throws Exception {
         Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         insCal.setTime(time);
-
         insCal.add(Calendar.MINUTE, 1);
         return insCal.getTime();
+
     }
 
+    public static String getDateFormatFromTime(long milliSeconds) {
+        return SchemaHelper.getDateFormat().format((new Date(milliSeconds)));
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index c1b4da6..606f741 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -842,8 +842,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
         return getStartAndEndDate(entityObject, startStr, endStr, getDefaultResultsPerPage());
     }
 
-    private Pair<Date, Date> getStartAndEndDate(Entity entityObject, String startStr,
String endStr, Integer numResults)
-        throws FalconException {
+    protected Pair<Date, Date> getStartAndEndDate(Entity entityObject, String startStr,
String endStr,
+                                                  Integer numResults) throws FalconException
{
         Pair<Date, Date> clusterStartEndDates = EntityUtil.getEntityStartEndDates(entityObject);
         Frequency frequency = EntityUtil.getFrequency(entityObject);
         Date endDate = getEndDate(endStr, clusterStartEndDates.second);
@@ -909,7 +909,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
         return start;
     }
 
-    private void validateParams(String type, String entity) throws FalconException {
+    protected void validateParams(String type, String entity) throws FalconException {
         validateNotEmpty("entityType", type);
         validateNotEmpty("entityName", entity);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 169614b..3ce261e 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -57,10 +57,15 @@ public class FalconUnitClient extends AbstractFalconClient {
 
     protected ConfigurationStore configStore;
     private AbstractWorkflowEngine workflowEngine;
+    private LocalSchedulableEntityManager localSchedulableEntityManager;
+    private LocalInstanceManager localInstanceManager;
+
 
     public FalconUnitClient() throws FalconException {
         configStore = ConfigurationStore.get();
         workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
+        localSchedulableEntityManager = new LocalSchedulableEntityManager();
+        localInstanceManager = new LocalInstanceManager();
     }
 
     public ConfigurationStore getConfigStore() {
@@ -123,6 +128,18 @@ public class FalconUnitClient extends AbstractFalconClient {
         return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties);
     }
 
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    @Override
+    public InstancesResult getStatusOfInstances(String type, String entity, String start,
String end,
+                                                String colo, List<LifeCycle> lifeCycles,
String filterBy,
+                                                String orderBy, String sortOrder, Integer
offset,
+                                                Integer numResults, String doAsUser) throws
FalconCLIException {
+        return localInstanceManager.getStatusOfInstances(type, entity, start, end, colo,
lifeCycles, filterBy, orderBy,
+                sortOrder, offset, numResults);
+
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
 
     /**
      * Schedules an submitted process entity immediately.

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java
new file mode 100644
index 0000000..1503b28
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.resource.AbstractInstanceManager;
+import org.apache.falcon.resource.InstancesResult;
+
+import java.util.List;
+
+/**
+ * A proxy implementation of the entity instance operations.
+ */
+public class LocalInstanceManager extends AbstractInstanceManager {
+
+    public LocalInstanceManager() {}
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public InstancesResult getStatusOfInstances(String type, String entity, String start,
String end,
+                                                String colo, List<LifeCycle> lifeCycles,
String filterBy,
+                                                String orderBy, String sortOrder, Integer
offset,
+                                                Integer numResults) {
+        return super.getStatus(type, entity, start, end, colo, lifeCycles, filterBy, orderBy,
sortOrder,
+                offset, numResults);
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
new file mode 100644
index 0000000..d793cf2
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+
+/**
+ * A proxy implementation of the schedulable entity operations in local mode.
+ */
+public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager {
+    // Created for future purposes to add all entity API's here for falcon unit.
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 995af2b..45b88f0 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -20,12 +20,15 @@ package org.apache.falcon.unit;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
@@ -34,6 +37,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.hadoop.JailedFileSystem;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.util.DateUtil;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +55,10 @@ import java.io.BufferedReader;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
@@ -74,6 +81,14 @@ public class FalconUnitTestBase {
         boolean evaluate() throws Exception;
     }
 
+    public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>()
{
+        @Override
+        protected SimpleDateFormat initialValue() {
+            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
+            return format;
+        }
+    };
+
     private static final Logger LOG = LoggerFactory.getLogger(FalconUnitTestBase.class);
 
     private static final String DEFAULT_CLUSTER = "local";
@@ -85,6 +100,7 @@ public class FalconUnitTestBase {
     private static final String WORKING_PATH = "/projects/falcon/working";
 
     public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##");
+    protected static final int WAIT_TIME = 90000;
     protected static FalconUnitClient falconUnitClient;
     protected static JailedFileSystem fs;
     protected static ConfigurationStore configStore;
@@ -170,6 +186,15 @@ public class FalconUnitTestBase {
                 skipDryRun, properties);
     }
 
+    public APIResult schedule(EntityType entityType, String entityName, String cluster) throws
FalconException,
+            FalconCLIException {
+        Entity entity = configStore.get(entityType, entityName);
+        if (entity == null) {
+            throw new FalconException("Process not found " + entityName);
+        }
+        return falconUnitClient.schedule(entityType, entityName, cluster, false, null, null);
+    }
+
     private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String,
String> props) {
         if (props == null) {
             props = new HashMap<>();
@@ -304,7 +329,7 @@ public class FalconUnitTestBase {
     }
 
     protected long waitForStatus(final EntityType entityType, final String entityName, final
String instanceTime) {
-        return waitFor(90000, new Predicate() {
+        return waitFor(WAIT_TIME, new Predicate() {
             public boolean evaluate() throws Exception {
                 InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType,
                         entityName, instanceTime);
@@ -317,4 +342,28 @@ public class FalconUnitTestBase {
         Assert.assertEquals(APIResult.Status.SUCCEEDED, apiResult.getStatus());
     }
 
+    public InstancesResult.WorkflowStatus getRetentionStatus(String feedName, String cluster)
throws FalconException,
+            FalconCLIException {
+        Feed feedEntity = EntityUtil.getEntity(EntityType.FEED, feedName);
+        Frequency.TimeUnit timeUnit = feedEntity.getFrequency().getTimeUnit();
+        long endTimeInMillis = System.currentTimeMillis() + 30000;
+        String endTime = DateUtil.getDateFormatFromTime(endTimeInMillis);
+        long startTimeInMillis;
+        if (timeUnit == Frequency.TimeUnit.hours || timeUnit == Frequency.TimeUnit.minutes)
{
+            startTimeInMillis = endTimeInMillis - (6 * DateUtil.HOUR_IN_MILLIS);
+        } else {
+            startTimeInMillis = endTimeInMillis - (24 * DateUtil.HOUR_IN_MILLIS);
+        }
+        String startTime = DateUtil.getDateFormatFromTime(startTimeInMillis);
+        List<LifeCycle> lifecycles = new ArrayList<>();
+        lifecycles.add(LifeCycle.EVICTION);
+        InstancesResult result = falconUnitClient.getStatusOfInstances("feed",
+                feedName, startTime, endTime, cluster,
+                lifecycles, null, "status", "asc", 0, 1, null);
+        if (result.getInstances() != null && result.getInstances().length > 0)
{
+            return result.getInstances()[0].getStatus();
+        }
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index fa9c664..d2e574b 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -17,6 +17,8 @@
  */
 package org.apache.falcon.unit;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
@@ -25,6 +27,9 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
+import java.text.ParseException;
+
 /**
  * Test cases of falcon jobs using Local Oozie and LocalJobRunner.
  */
@@ -56,4 +61,30 @@ public class TestFalconUnit extends FalconUnitTestBase {
         FileStatus[] files = getFileSystem().listStatus(new Path(outPath));
         Assert.assertTrue(files.length > 0);
     }
+
+
+    @Test
+    public void testRetention() throws IOException, FalconCLIException, FalconException,
+            ParseException, InterruptedException {
+        // submit with default props
+        submitCluster();
+        // submitting feeds
+        APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
+        assertStatus(result);
+        String scheduleTime = "2015-06-20T00:00Z";
+        createData("in", "local", scheduleTime, "input.txt");
+        String inPath = getFeedPathForTS("local", "in", scheduleTime);
+        Assert.assertTrue(fs.exists(new Path(inPath)));
+        result = schedule(EntityType.FEED, "in", "local");
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, result.getStatus());
+        waitFor(WAIT_TIME, new Predicate() {
+            public boolean evaluate() throws Exception {
+                InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local");
+                return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status);
+            }
+        });
+        InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local");
+        Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status);
+        Assert.assertFalse(fs.exists(new Path(inPath)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/test/resources/infeed.xml
----------------------------------------------------------------------
diff --git a/unit/src/test/resources/infeed.xml b/unit/src/test/resources/infeed.xml
index 509d868..62f1ba7 100644
--- a/unit/src/test/resources/infeed.xml
+++ b/unit/src/test/resources/infeed.xml
@@ -26,7 +26,7 @@
     <clusters>
         <cluster name="local">
             <validity start="2013-01-01T00:00Z" end="2030-01-01T00:00Z"/>
-            <retention limit="hours(400000)" action="delete"/>
+            <retention limit="hours(200)" action="delete"/>
         </cluster>
     </clusters>
 


Mime
View raw message