falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (by Pavan Kolamuri)
Date Mon, 26 Oct 2015 11:14:57 GMT
Repository: falcon
Updated Branches:
  refs/heads/master c30fce751 -> 7854e3d90


FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (by Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7854e3d9
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7854e3d9
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7854e3d9

Branch: refs/heads/master
Commit: 7854e3d90e7a5258b85f45afd00694e3f5157142
Parents: c30fce7
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Mon Oct 26 16:44:30 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Oct 26 16:44:30 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../falcon/client/AbstractFalconClient.java     |  61 +++++-
 .../falcon/client/FalconCLIException.java       |   4 +
 .../falcon/entity/store/ConfigurationStore.java |   2 +
 .../oozie/client/LocalProxyOozieClient.java     |   4 +-
 .../falcon/resource/AbstractEntityManager.java  |  47 +++--
 .../AbstractSchedulableEntityManager.java       |   4 +-
 .../proxy/SchedulableEntityManagerProxy.java    |   4 +-
 .../apache/falcon/unit/FalconUnitClient.java    |  89 +++++----
 .../unit/LocalSchedulableEntityManager.java     |  31 +++-
 .../apache/falcon/unit/FalconUnitTestBase.java  |  11 +-
 .../org/apache/falcon/unit/TestFalconUnit.java  | 185 +++++++++++++++----
 unit/src/test/resources/process1.xml            |  50 -----
 13 files changed, 341 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 95dd69d..c00c265 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (Pavan
Kolamuri via Pallavi Rao)
 
   OPTIMIZATIONS
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index b889931..91d5324 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.client;
 
 import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
@@ -31,12 +32,14 @@ import java.util.List;
  */
 public abstract class AbstractFalconClient {
 
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+
     /**
      * Submit a new entity. Entities can be of type feed, process or data end
      * points. Entity definitions are validated structurally against schema and
      * subsequently for other rules before they are admitted into the system.
-     * @param entityType
-     * @param filePath
+     * @param entityType Entity type. Valid options are cluster, feed or process.
+     * @param filePath Path for the entity definition
      * @return
      * @throws FalconCLIException
      */
@@ -45,17 +48,63 @@ public abstract class AbstractFalconClient {
 
     /**
      * Schedules an submitted process entity immediately.
-     * @param entityType
-     * @param entityName
-     * @param colo
+     * @param entityType Entity type. Valid options are cluster, feed or process.
+     * @param entityName Name of the entity.
+     * @param colo Cluster name.
      * @return
      * @throws FalconCLIException
      */
     public abstract APIResult schedule(EntityType entityType, String entityName, String colo,
Boolean skipDryRun,
                                         String doAsuser, String properties) throws FalconCLIException;
 
+    /**
+     * Delete the specified entity.
+     * @param entityType Entity type. Valid options are cluster, feed or process.
+     * @param entityName Name of the entity.
+     * @param doAsUser Proxy User.
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract APIResult delete(EntityType entityType, String entityName,
+                                     String doAsUser) throws FalconCLIException;
+
+    /**
+     * Validates the submitted entity.
+     * @param entityType Entity type. Valid options are cluster, feed or process.
+     * @param filePath Path for the entity definition to validate.
+     * @param skipDryRun Dry run.
+     * @param doAsUser Proxy User.
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract APIResult validate(String entityType, String filePath, Boolean skipDryRun,
+                                       String doAsUser) throws FalconCLIException;
+
+    /**
+     * Updates the submitted entity.
+     * @param entityType Entity type. Valid options are cluster, feed or process.
+     * @param entityName Name of the entity.
+     * @param filePath Path for the entity definition to update.
+     * @param skipDryRun Dry run.
+     * @param doAsUser Proxy User.
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract APIResult update(String entityType, String entityName, String filePath,
+                                                       Boolean skipDryRun, String doAsUser)
throws FalconCLIException;
+
+    /**
+     * Get definition of the entity.
+     * @param entityType Entity type. Valid options are cluster, feed or process.
+     * @param entityName Name of the entity.
+     * @param doAsUser Proxy user.
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract Entity getDefinition(String entityType, String entityName,
+                                         String doAsUser) throws FalconCLIException;
+
 
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
 
     /**
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
index ec74c27..51ef952 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
@@ -36,6 +36,10 @@ public class FalconCLIException extends Exception {
         super(msg);
     }
 
+    public FalconCLIException(Throwable e) {
+        super(e);
+    }
+
     public FalconCLIException(String msg, Throwable throwable) {
         super(msg, throwable);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index e27187b..4dd1c68 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -60,6 +60,8 @@ public final class ConfigurationStore implements FalconService {
 
     private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] {
         EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, };
+    public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS,
EntityType.FEED,
+        EntityType.CLUSTER, };
 
     private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
     private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 756828f..c2100d1 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -166,7 +166,7 @@ public class LocalProxyOozieClient extends OozieClient {
 
     @Override
     public void reRun(String jobId, Properties conf) throws OozieClientException {
-        throw new IllegalStateException("Rerun not supported ");
+        getClient(jobId).reRun(jobId, conf);
     }
 
     @Override
@@ -181,7 +181,7 @@ public class LocalProxyOozieClient extends OozieClient {
 
     @Override
     public void kill(String jobId) throws OozieClientException {
-        throw new IllegalStateException("Kill not supported");
+        getClient(jobId).kill(jobId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 3323dd1..16ef83a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -75,7 +75,7 @@ import java.util.Set;
 public abstract class AbstractEntityManager {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractEntityManager.class);
     private static MemoryLocks memoryLocks = MemoryLocks.getInstance();
-    private static final String DO_AS_PARAM = "doAs";
+    protected static final String DO_AS_PARAM = "doAs";
 
     protected static final int XML_DEBUG_LEN = 10 * 1024;
     private AbstractWorkflowEngine workflowEngine;
@@ -195,7 +195,8 @@ public abstract class AbstractEntityManager {
 
         checkColo(colo);
         try {
-            Entity entity = submitInternal(request, type);
+            String doAsUser = request.getParameter(DO_AS_PARAM);
+            Entity entity = submitInternal(request.getInputStream(), type, doAsUser);
             return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type
+ ") " + entity.getName());
         } catch (Throwable e) {
             LOG.error("Unable to persist entity object", e);
@@ -205,15 +206,24 @@ public abstract class AbstractEntityManager {
 
     /**
      * Post an entity XML with entity type. Validates the XML which can be
-     * Process, Feed or Dataendpoint
+     * Process, Feed or Data endpoint
      *
      * @param type entity type
-     * @return APIResule -Succeeded or Failed
+     * @return APIResult -Succeeded or Failed
      */
     public APIResult validate(HttpServletRequest request, String type, Boolean skipDryRun)
{
         try {
+            return validate(request.getInputStream(), type, skipDryRun);
+        } catch (IOException e) {
+            LOG.error("Unable to get InputStream from Request", request, e);
+            throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+        }
+    }
+
+    protected APIResult validate(InputStream inputStream, String type, Boolean skipDryRun)
{
+        try {
             EntityType entityType = EntityType.getEnum(type);
-            Entity entity = deserializeEntity(request, entityType);
+            Entity entity = deserializeEntity(inputStream, entityType);
             validate(entity);
 
             //Validate that the entity can be scheduled in the cluster
@@ -244,6 +254,11 @@ public abstract class AbstractEntityManager {
      * @return APIResult
      */
     public APIResult delete(HttpServletRequest request, String type, String entity, String
colo) {
+        return delete(type, entity, colo);
+
+    }
+
+    protected APIResult delete(String type, String entity, String colo) {
         checkColo(colo);
         List<Entity> tokenList = new ArrayList<>();
         try {
@@ -277,12 +292,23 @@ public abstract class AbstractEntityManager {
 
     public APIResult update(HttpServletRequest request, String type, String entityName,
                             String colo, Boolean skipDryRun) {
+        try {
+            return update(request.getInputStream(), type, entityName, colo, skipDryRun);
+        } catch (IOException e) {
+            LOG.error("Unable to get InputStream from Request", request, e);
+            throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+        }
+
+    }
+
+    protected APIResult update(InputStream inputStream, String type, String entityName,
+                            String colo, Boolean skipDryRun) {
         checkColo(colo);
         List<Entity> tokenList = new ArrayList<>();
         try {
             EntityType entityType = EntityType.getEnum(type);
             Entity oldEntity = EntityUtil.getEntity(type, entityName);
-            Entity newEntity = deserializeEntity(request, entityType);
+            Entity newEntity = deserializeEntity(inputStream, entityType);
             // KLUDGE - Until ACL is mandated entity passed should be decorated for equals
check to pass
             decorateEntityWithACL(newEntity);
             validate(newEntity);
@@ -309,7 +335,6 @@ public abstract class AbstractEntityManager {
             }
 
             configStore.update(entityType, newEntity);
-
             return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
         } catch (Throwable e) {
             LOG.error("Update failed", e);
@@ -399,11 +424,11 @@ public abstract class AbstractEntityManager {
         }
     }
 
-    protected Entity submitInternal(HttpServletRequest request, String type)
+    protected Entity submitInternal(InputStream inputStream, String type, String doAsUser)
         throws IOException, FalconException {
 
         EntityType entityType = EntityType.getEnum(type);
-        Entity entity = deserializeEntity(request, entityType);
+        Entity entity = deserializeEntity(inputStream, entityType);
         List<Entity> tokenList = new ArrayList<>();
         // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check
to pass
         decorateEntityWithACL(entity);
@@ -425,7 +450,6 @@ public abstract class AbstractEntityManager {
                             + "Can't be submitted again. Try removing before submitting.");
         }
 
-        String doAsUser = request.getParameter(DO_AS_PARAM);
         SecurityUtil.tryProxy(entity, doAsUser); // proxy before validating since FS/Oozie
needs to be proxied
         validate(entity);
         configStore.publish(entityType, entity);
@@ -477,11 +501,10 @@ public abstract class AbstractEntityManager {
         }
     }
 
-    protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType)
+    protected Entity deserializeEntity(InputStream xmlStream, EntityType entityType)
         throws IOException, FalconException {
 
         EntityParser<?> entityParser = EntityParserFactory.getParser(entityType);
-        InputStream xmlStream = request.getInputStream();
         if (xmlStream.markSupported()) {
             xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 0db55df..d317aa1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -82,7 +82,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         }
     }
 
-    private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun,
+    protected synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun,
             Map<String, String> properties) throws FalconException, AuthorizationException
{
 
         checkSchedulableEntity(type);
@@ -187,7 +187,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         checkColo(colo);
         try {
             checkSchedulableEntity(type);
-            Entity entity = submitInternal(request, type);
+            Entity entity = submitInternal(request.getInputStream(), type, request.getParameter(DO_AS_PARAM));
             scheduleInternal(type, entity.getName(), skipDryRun, EntityUtil.getPropertyMap(properties));
             return new APIResult(APIResult.Status.SUCCEEDED,
                     entity.getName() + "(" + type + ") scheduled successfully");

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 9d13d74..d3ba189 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -199,7 +199,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     private Entity getEntity(HttpServletRequest request, String type) {
         try {
             request.getInputStream().reset();
-            Entity entity = deserializeEntity(request, EntityType.getEnum(type));
+            Entity entity = deserializeEntity(request.getInputStream(), EntityType.getEnum(type));
             request.getInputStream().reset();
             return entity;
         } catch (Exception e) {
@@ -225,7 +225,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
         EntityType entityType = EntityType.getEnum(type);
         final Entity entity;
         try {
-            entity = deserializeEntity(bufferedRequest, entityType);
+            entity = deserializeEntity(bufferedRequest.getInputStream(), entityType);
             bufferedRequest.getInputStream().reset();
         } catch (Exception e) {
             throw FalconWebException.newException("Unable to parse the request", Response.Status.BAD_REQUEST);

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 783af19..b5afae3 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -23,8 +23,6 @@ import org.apache.falcon.LifeCycle;
 import org.apache.falcon.client.AbstractFalconClient;
 import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.parser.EntityParser;
-import org.apache.falcon.entity.parser.EntityParserFactory;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -42,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -55,6 +52,9 @@ public class FalconUnitClient extends AbstractFalconClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class);
 
+    private static final String DEFAULT_ORDERBY = "status";
+    private static final String DEFAULT_SORTED_ORDER = "asc";
+
     protected ConfigurationStore configStore;
     private AbstractWorkflowEngine workflowEngine;
     private LocalSchedulableEntityManager localSchedulableEntityManager;
@@ -84,29 +84,9 @@ public class FalconUnitClient extends AbstractFalconClient {
      */
     @Override
     public APIResult submit(String type, String filePath, String doAsUser) throws IOException,
FalconCLIException {
-        try {
-            EntityType entityType = EntityType.getEnum(type);
-            InputStream entityStream = FalconUnitHelper.getFileInputStream(filePath);
-            EntityParser entityParser = EntityParserFactory.getParser(entityType);
-            Entity entity = entityParser.parse(entityStream);
-
-            Entity existingEntity = configStore.get(entityType, entity.getName());
-            if (existingEntity != null) {
-                if (EntityUtil.equals(existingEntity, entity)) {
-                    LOG.warn(entity.toShortString() + " already registered with same definition
" + entity.getName());
-                    return new APIResult(APIResult.Status.SUCCEEDED, "{} already registered
with same definition"
-                            + entity.getName());
-                }
-                LOG.warn(entity.toShortString() + " already registered with different definition
"
-                        + "Can't be submitted again. Try removing before submitting.");
-                return new APIResult(APIResult.Status.FAILED, "{} already registered with
different definition "
-                        + "Can't be submitted again. Try removing before submitting." + entity.getName());
-            }
 
-            entityParser.validate(entity);
-            configStore.publish(entityType, entity);
-            LOG.info("Submit successful: ({}): {}", entityType.name(), entity.getName());
-            return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type
+ ") " + entity.getName());
+        try {
+            return localSchedulableEntityManager.submit(type, filePath, doAsUser);
         } catch (FalconException e) {
             throw new FalconCLIException("FAILED", e);
         }
@@ -128,12 +108,56 @@ public class FalconUnitClient extends AbstractFalconClient {
         return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties);
     }
 
+    @Override
+    public APIResult delete(EntityType entityType, String entityName, String doAsUser) {
+        return localSchedulableEntityManager.delete(entityType, entityName, doAsUser);
+    }
+
+    @Override
+    public APIResult validate(String entityType, String filePath, Boolean skipDryRun,
+                              String doAsUser) throws FalconCLIException {
+        try {
+            return localSchedulableEntityManager.validate(entityType, filePath, skipDryRun,
doAsUser);
+        } catch (FalconException e) {
+            throw new FalconCLIException(e);
+        }
+    }
+
+    @Override
+    public APIResult update(String entityType, String entityName, String filePath,
+                            Boolean skipDryRun, String doAsUser) throws FalconCLIException
{
+        try {
+            return localSchedulableEntityManager.update(entityType, entityName, filePath,
+                    skipDryRun, "local", doAsUser);
+        } catch (FalconException e) {
+            throw new FalconCLIException(e);
+        }
+    }
+
+    @Override
+    public Entity getDefinition(String entityType, String entityName, String doAsUser) throws
FalconCLIException {
+        String entity = localSchedulableEntityManager.getEntityDefinition(entityType, entityName);
+        return Entity.fromString(EntityType.getEnum(entityType), entity);
+    }
+
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     @Override
     public InstancesResult getStatusOfInstances(String type, String entity, String start,
String end,
                                                 String colo, List<LifeCycle> lifeCycles,
String filterBy,
                                                 String orderBy, String sortOrder, Integer
offset,
                                                 Integer numResults, String doAsUser) throws
FalconCLIException {
+        if (orderBy == null) {
+            orderBy = DEFAULT_ORDERBY;
+        }
+        if (sortOrder == null) {
+            sortOrder = DEFAULT_SORTED_ORDER;
+        }
+        if (offset == null) {
+            offset = 0;
+        }
+        if (numResults == null) {
+            numResults = 1;
+        }
         return localInstanceManager.getStatusOfInstances(type, entity, start, end, colo,
lifeCycles, filterBy, orderBy,
                 sortOrder, offset, numResults);
 
@@ -164,7 +188,7 @@ public class FalconUnitClient extends AbstractFalconClient {
             if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS)
{
                 updateStartAndEndTime((Process) entity, startTime, numInstances, cluster);
             }
-            workflowEngine.schedule(entity, skipDryRun,  EntityUtil.getPropertyMap(properties));
+            workflowEngine.schedule(entity, skipDryRun, EntityUtil.getPropertyMap(properties));
             LOG.info(entityName + " is scheduled successfully");
             return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ")
scheduled successfully");
         } catch (FalconException e) {
@@ -180,16 +204,13 @@ public class FalconUnitClient extends AbstractFalconClient {
      * @param nominalTime nominal time of process
      * @return InstancesResult.WorkflowStatus
      */
-    public InstancesResult.WorkflowStatus getInstanceStatus(EntityType entityType, String
entityName,
+    public InstancesResult.WorkflowStatus getInstanceStatus(String entityType, String entityName,
                                                             String nominalTime) throws Exception
{
-        if (entityType == EntityType.CLUSTER) {
-            throw new IllegalArgumentException("Instance management functions don't apply
to Cluster entities");
-        }
-        Entity entityObject = EntityUtil.getEntity(entityType, entityName);
         Date startTime = SchemaHelper.parseDateUTC(nominalTime);
-        Date endTime = DateUtil.getNextMinute(startTime);
-        List<LifeCycle> lifeCycles = FalconUnitHelper.checkAndUpdateLifeCycle(null,
entityType.name());
-        InstancesResult instancesResult = workflowEngine.getStatus(entityObject, startTime,
endTime, lifeCycles);
+        Date endTimeDate = DateUtil.getNextMinute(startTime);
+        String endTime = DateUtil.getDateFormatFromTime(endTimeDate.getTime());
+        InstancesResult instancesResult = getStatusOfInstances(entityType, entityName, nominalTime,
endTime, null,
+                null, null, null, null, null, null, null);
         if (instancesResult.getInstances() != null && instancesResult.getInstances().length
> 0
                 && instancesResult.getInstances()[0] != null) {
             LOG.info("Instance status is " + instancesResult.getInstances()[0].getStatus());

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
index 8b1c435..42adc9a 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
@@ -17,14 +17,19 @@
  */
 package org.apache.falcon.unit;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.AbstractSchedulableEntityManager;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 /**
  * A proxy implementation of the schedulable entity operations in local mode.
  */
 public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager {
-    // Created for future purposes to add all entity API's here for falcon unit.
 
     public LocalSchedulableEntityManager() {}
 
@@ -40,4 +45,28 @@ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityMana
         return super.getStatus(type, entity, colo);
     }
 
+    public APIResult delete(EntityType entityType, String entityName, String doAsUser) {
+        if (entityType == null) {
+            throw new IllegalStateException("Entity-Type cannot be null");
+        }
+        return super.delete(entityType.name(), entityName, doAsUser);
+    }
+
+    public APIResult validate(String entityType, String filePath, Boolean skipDryRun,
+                              String doAsUser) throws FalconException {
+        InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath);
+        return super.validate(inputStream, entityType, skipDryRun);
+    }
+
+    public APIResult update(String entityType, String entityName, String filePath,
+                            Boolean skipDryRun, String doAsUser, String colo) throws FalconException
{
+        InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath);
+        return super.update(inputStream, entityType, entityName, colo, skipDryRun);
+    }
+
+    public APIResult submit(String entityType, String filePath, String doAsUser) throws FalconException,
IOException {
+        InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath);
+        Entity entity = super.submitInternal(inputStream, entityType, doAsUser);
+        return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + entityType
+ ") " + entity.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index d12efbc..ac478f4 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -77,7 +77,6 @@ public class FalconUnitTestBase {
      * @throws Exception thrown if the predicate evaluation could not evaluate.
      */
     public interface Predicate {
-
         boolean evaluate() throws Exception;
     }
 
@@ -122,9 +121,9 @@ public class FalconUnitTestBase {
 
     @AfterMethod
     public void cleanUpActionXml() throws IOException, FalconException {
-        for (EntityType type : EntityType.values()) {
+        for (EntityType type : ConfigurationStore.ENTITY_DELETE_ORDER) {
             for (String name : ConfigurationStore.get().getEntities(type)) {
-                ConfigurationStore.get().remove(type, name);
+                getClient().delete(type, name, null);
             }
         }
         //Needed since oozie writes action xml to current directory.
@@ -275,7 +274,7 @@ public class FalconUnitTestBase {
                            String inputFile) throws FalconException, ParseException, IOException
{
         String feedPath = getFeedPathForTS(cluster, feedName, time);
         fs.mkdirs(new Path(feedPath));
-        fs.copyFromLocalFile(new Path(getAbsolutePath("/" + inputFile)), new Path(feedPath));
+        fs.copyFromLocalFile(new Path(getAbsolutePath(inputFile)), new Path(feedPath));
     }
 
     protected String getFeedPathForTS(String cluster, String feedName,
@@ -295,7 +294,7 @@ public class FalconUnitTestBase {
 
 
     public String getAbsolutePath(String fileName) {
-        return this.getClass().getResource(fileName).getPath();
+        return this.getClass().getResource("/" + fileName).getPath();
     }
 
     public void createDir(String path) throws IOException {
@@ -333,7 +332,7 @@ public class FalconUnitTestBase {
         }
     }
 
-    protected long waitForStatus(final EntityType entityType, final String entityName, final
String instanceTime) {
+    protected long waitForStatus(final String entityType, final String entityName, final
String instanceTime) {
         return waitFor(WAIT_TIME, new Predicate() {
             public boolean evaluate() throws Exception {
                 InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType,

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index d504bd2..8cdbd88 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -18,8 +18,11 @@
 package org.apache.falcon.unit;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconWebException;
 import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.hadoop.fs.FileStatus;
@@ -27,36 +30,47 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.text.ParseException;
 
+import static org.apache.falcon.entity.EntityUtil.getEntity;
+
+
 /**
  * Test cases of falcon jobs using Local Oozie and LocalJobRunner.
  */
 public class TestFalconUnit extends FalconUnitTestBase {
 
+    private static final String INPUT_FEED = "infeed.xml";
+    private static final String OUTPUT_FEED = "outfeed.xml";
+    private static final String PROCESS = "process.xml";
+    private static final String PROCESS_APP_PATH = "/app/oozie-mr";
+    private static final String CLUSTER_NAME = "local";
+    private static final String INPUT_FEED_NAME = "in";
+    private static final String PROCESS_NAME = "process";
+    private static final String OUTPUT_FEED_NAME = "out";
+    private static final String INPUT_FILE_NAME = "input.txt";
+    private static final String SCHEDULE_TIME = "2015-06-20T00:00Z";
+    private static final String WORKFLOW = "workflow.xml";
+
     @Test
     public void testProcessInstanceExecution() throws Exception {
-        // submit with default props
-        submitCluster();
-        // submitting feeds
-        APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
-        assertStatus(result);
-        result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml"));
-        assertStatus(result);
+        submitClusterAndFeeds();
         // submitting and scheduling process
-        String scheduleTime = "2015-06-20T00:00Z";
-        createData("in", "local", scheduleTime, "input.txt");
-        result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr");
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+        APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
         assertStatus(result);
-        result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"),
+        result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
                 true, "");
         assertStatus(result);
-        waitForStatus(EntityType.PROCESS, "process", scheduleTime);
-        InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS,
-                "process", scheduleTime);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+                PROCESS_NAME, SCHEDULE_TIME);
         Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED);
-        String outPath = getFeedPathForTS("local", "out", scheduleTime);
+        String outPath = getFeedPathForTS(CLUSTER_NAME, OUTPUT_FEED_NAME, SCHEDULE_TIME);
         Assert.assertTrue(getFileSystem().exists(new Path(outPath)));
         FileStatus[] files = getFileSystem().listStatus(new Path(outPath));
         Assert.assertTrue(files.length > 0);
@@ -69,52 +83,149 @@ public class TestFalconUnit extends FalconUnitTestBase {
         // submit with default props
         submitCluster();
         // submitting feeds
-        APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
+        APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED));
         assertStatus(result);
-        String scheduleTime = "2015-06-20T00:00Z";
-        createData("in", "local", scheduleTime, "input.txt");
-        String inPath = getFeedPathForTS("local", "in", scheduleTime);
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+        String inPath = getFeedPathForTS(CLUSTER_NAME, INPUT_FEED_NAME, SCHEDULE_TIME);
         Assert.assertTrue(fs.exists(new Path(inPath)));
-        result = schedule(EntityType.FEED, "in", "local");
+        result = schedule(EntityType.FEED, INPUT_FEED_NAME, CLUSTER_NAME);
         Assert.assertEquals(APIResult.Status.SUCCEEDED, result.getStatus());
         waitFor(WAIT_TIME, new Predicate() {
             public boolean evaluate() throws Exception {
-                InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local");
+                InstancesResult.WorkflowStatus status = getRetentionStatus(INPUT_FEED_NAME,
CLUSTER_NAME);
                 return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status);
             }
         });
-        InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local");
+        InstancesResult.WorkflowStatus status = getRetentionStatus(INPUT_FEED_NAME, CLUSTER_NAME);
         Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status);
         Assert.assertFalse(fs.exists(new Path(inPath)));
     }
 
     @Test
     public void testSuspendAndResume() throws Exception {
-        // submit with default props
-        submitCluster();
-        // submitting feeds
-        APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
-        assertStatus(result);
-        result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml"));
-        assertStatus(result);
+        submitClusterAndFeeds();
         // submitting and scheduling process
         String scheduleTime = "2015-06-20T00:00Z";
-        createData("in", "local", scheduleTime, "input.txt");
-        result = submitProcess(getAbsolutePath("/process1.xml"), "/app/oozie-mr");
+        //String processName = "process1";
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, scheduleTime, INPUT_FILE_NAME);
+        APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
         assertStatus(result);
-        result = scheduleProcess("process1", scheduleTime, 2, "local", getAbsolutePath("/workflow.xml"),
+        result = scheduleProcess(PROCESS_NAME, scheduleTime, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
                 true, "");
         assertStatus(result);
-        waitForStatus(EntityType.PROCESS, "process1", scheduleTime);
-        result = getClient().suspend(EntityType.PROCESS, "process1", "local", null);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime);
+        result = getClient().suspend(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
         assertStatus(result);
-        result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+        result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
         assertStatus(result);
         Assert.assertEquals(result.getMessage(), "SUSPENDED");
-        result = getClient().resume(EntityType.PROCESS, "process1", "local", null);
+        result = getClient().resume(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
         assertStatus(result);
-        result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+        result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
         assertStatus(result);
         Assert.assertEquals(result.getMessage(), "RUNNING");
     }
+
+    @Test
+    public void testDelete() throws IOException, FalconCLIException, FalconException,
+            ParseException, InterruptedException {
+        // submit cluster and feeds
+        submitClusterAndFeeds();
+        APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+        assertStatus(result);
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+        result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+        assertStatus(result);
+        result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
+                true, "");
+        assertStatus(result);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        result = getClient().delete(EntityType.PROCESS, PROCESS_NAME, null);
+        assertStatus(result);
+        try {
+            getEntity(EntityType.PROCESS, PROCESS_NAME);
+            Assert.fail("Exception should be thrown");
+        } catch (FalconException e) {
+            // nothing to do
+        }
+
+        result = getClient().delete(EntityType.FEED, INPUT_FEED_NAME, null);
+        assertStatus(result);
+        try {
+            getEntity(EntityType.FEED, INPUT_FEED_NAME);
+            Assert.fail("Exception should be thrown");
+        } catch (FalconException e) {
+            // nothing to do
+        }
+    }
+
+    @Test
+    public void testValidate() throws IOException, FalconCLIException, FalconException {
+        submitClusterAndFeeds();
+        APIResult result = getClient().validate(EntityType.PROCESS.name(),
+                getAbsolutePath(PROCESS), true, null);
+        assertStatus(result);
+        try {
+            getClient().validate(EntityType.PROCESS.name(),
+                    getAbsolutePath(INPUT_FEED), true, null);
+            Assert.fail("Exception should be thrown");
+        } catch (FalconWebException e) {
+            // nothing to do
+        }
+    }
+
+    @Test
+    public void testUpdate() throws IOException, FalconCLIException, FalconException,
+            ParseException, InterruptedException {
+        submitClusterAndFeeds();
+        APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+        assertStatus(result);
+        result = getClient().update(EntityType.PROCESS.name(), PROCESS_NAME,
+                getAbsolutePath(PROCESS), true, null);
+        assertStatus(result);
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+        result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+        assertStatus(result);
+        result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
+                true, "");
+        assertStatus(result);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+
+        Process process = getEntity(EntityType.PROCESS, PROCESS_NAME);
+        setDummyProperty(process);
+        String processXml = process.toString();
+
+        File file = new File("target/newprocess.xml");
+        file.createNewFile();
+        FileWriter fw = new FileWriter(file.getAbsoluteFile());
+        BufferedWriter bw = new BufferedWriter(fw);
+        bw.write(processXml);
+        bw.close();
+
+        result = falconUnitClient.update(EntityType.PROCESS.name(), PROCESS_NAME, file.getAbsolutePath(),
true, null);
+        assertStatus(result);
+
+        process = getEntity(EntityType.PROCESS,
+                PROCESS_NAME);
+        Assert.assertEquals(process.toString(), processXml);
+        file.delete();
+    }
+
+    private void submitClusterAndFeeds() throws IOException, FalconCLIException {
+        // submit with default props
+        submitCluster();
+        // submitting feeds
+        APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED));
+        assertStatus(result);
+        result = submit(EntityType.FEED, getAbsolutePath(OUTPUT_FEED));
+        assertStatus(result);
+    }
+
+    public void setDummyProperty(Process process) {
+        Property property = new Property();
+        property.setName("dummy");
+        property.setValue("dummy");
+        process.getProperties().getProperties().add(property);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/resources/process1.xml
----------------------------------------------------------------------
diff --git a/unit/src/test/resources/process1.xml b/unit/src/test/resources/process1.xml
deleted file mode 100644
index 37dbb9c..0000000
--- a/unit/src/test/resources/process1.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<process name="process1" xmlns="uri:falcon:process:0.1">
-    <clusters>
-        <cluster name="local">
-            <validity start="2013-11-18T00:05Z" end="2013-11-18T01:05Z"/>
-        </cluster>
-    </clusters>
-
-    <parallel>5</parallel>
-    <order>FIFO</order>
-    <frequency>minutes(1)</frequency>
-    <timezone>UTC</timezone>
-
-    <inputs>
-        <!-- In the workflow, the input paths will be available in a variable 'inpaths'
-->
-        <input name="inpaths" feed="in" start="now(0,0)" end="now(0,0)" />
-    </inputs>
-
-    <outputs>
-        <!-- In the workflow, the output path will be available in a variable 'outpath'
-->
-        <output name="outpath" feed="out" instance="now(0,0)"/>
-    </outputs>
-
-    <properties>
-        <!-- In the workflow, these properties will be available with variable - key -->
-        <property name="queueName" value="default"/>
-        <!-- The schedule time available as a property in workflow -->
-        <property name="time" value="${instanceTime()}"/>
-    </properties>
-
-    <workflow engine="oozie" path="/app/oozie-mr"/>
-</process>


Mime
View raw message