falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1517 Instance Management Api in Falcon Unit (Narayan Periwal)
Date Fri, 30 Oct 2015 08:58:19 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 5ecae18ec -> 49fbc8c96


FALCON-1517 Instance Management Api in Falcon Unit (Narayan Periwal)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/49fbc8c9
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/49fbc8c9
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/49fbc8c9

Branch: refs/heads/master
Commit: 49fbc8c961f4f467f7f7ae12efd1c80963c7f8a9
Parents: 5ecae18
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Fri Oct 30 14:27:53 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Fri Oct 30 14:27:53 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/client/AbstractFalconClient.java     | 200 ++++++++++++++++++-
 .../org/apache/falcon/client/FalconClient.java  |  23 ---
 .../client/LocalOozieClientCoordProxy.java      |  62 ++++++
 .../oozie/client/LocalProxyOozieClient.java     |  32 +--
 .../resource/AbstractInstanceManager.java       |  65 +++---
 .../apache/falcon/unit/FalconUnitClient.java    |  81 +++++++-
 .../falcon/unit/LocalInstanceManager.java       |  51 +++++
 .../apache/falcon/unit/FalconUnitTestBase.java  |   7 +-
 .../org/apache/falcon/unit/TestFalconUnit.java  | 113 ++++++++++-
 .../falcon/unit/examples/JavaSleepExample.java  |  33 +++
 unit/src/test/resources/sleepWorkflow.xml       |  41 ++++
 12 files changed, 635 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1f9269..869b182 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,8 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1517 Instance Management Api in Falcon Unit (Narayan Periwal via Pallavi Rao)
+
     FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (Pavan Kolamuri via Pallavi Rao)
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 91d5324..27b93c0 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -21,9 +21,15 @@ import org.apache.falcon.LifeCycle;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
 import java.util.List;
 
 /**
@@ -34,6 +40,9 @@ public abstract class AbstractFalconClient {
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
 
+    protected static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
+    protected static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
+
     /**
      * Submit a new entity. Entities can be of type feed, process or data end
      * points. Entity definitions are validated structurally against schema and
@@ -119,7 +128,7 @@ public abstract class AbstractFalconClient {
      * @param sortOrder sort order can be asc or desc
      * @param offset offset while displaying results
      * @param numResults num of Results to output
-     * @param doAsUser
+     * @param doAsUser proxy user
      * @return
      * @throws FalconCLIException
      */
@@ -129,7 +138,6 @@ public abstract class AbstractFalconClient {
                                                          String orderBy, String sortOrder,
                                                          Integer offset, Integer numResults,
                                                          String doAsUser) throws FalconCLIException;
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     /**
      * Suspend an entity.
@@ -166,4 +174,192 @@ public abstract class AbstractFalconClient {
      */
     public abstract APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws
             FalconCLIException;
+
+    /**
+     * Kill currently running instance(s) of an entity.
+     * @param type Valid options are feed or process.
+     * @param entity name of the entity.
+     * @param start start time of the instance(s) that you want to refer to
+     * @param end end time of the instance(s) that you want to refer to
+     * @param colo Colo on which the query should be run.
+     * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for
+     *                   process.
+     * @param doAsUser proxy user
+     * @return Result of the kill operation.
+     */
+    public abstract InstancesResult killInstances(String type, String entity, String start, String end, String colo,
+                                                  String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+                                                  String doAsUser) throws FalconCLIException,
+            UnsupportedEncodingException;
+
+    /**
+     * Suspend instances of an entity.
+     * @param type Valid options are feed or process.
+     * @param entity name of the entity.
+     * @param start the start time of the instance(s) that you want to refer to
+     * @param end the end time of the instance(s) that you want to refer to
+     * @param colo Colo on which the query should be run.
+     * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for
+     *                   process.
+     * @param doAsUser proxy user
+     * @return Results of the suspend command.
+     */
+    public abstract InstancesResult suspendInstances(String type, String entity, String start, String end, String colo,
+                                            String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+                                            String doAsUser) throws FalconCLIException, UnsupportedEncodingException;
+
+    /**
+     * Resume suspended instances of an entity.
+     * @param type Valid options are feed or process.
+     * @param entity name of the entity.
+     * @param start start time of the instance(s) that you want to refer to
+     * @param end the end time of the instance(s) that you want to refer to
+     * @param colo Colo on which the query should be run.
+     * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for
+     *                   process.
+     * @param doAsUser proxy user
+     * @return Results of the resume command.
+     */
+    public abstract InstancesResult resumeInstances(String type, String entity, String start, String end, String colo,
+                                           String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+                                           String doAsUser) throws FalconCLIException, UnsupportedEncodingException;
+
+    /**
+     * Rerun instances of an entity. On issuing a rerun, by default the execution resumes from the last failed node in
+     * the workflow.
+     * @param type Valid options are feed or process.
+     * @param entity name of the entity.
+     * @param start start is the start time of the instance that you want to refer to
+     * @param end end is the end time of the instance that you want to refer to
+     * @param colo Colo on which the query should be run.
+     * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for
+     *                   process.
+     * @param isForced <optional param> can be used to forcefully rerun the entire instance.
+     * @param doAsUser proxy user
+     * @return Results of the rerun command.
+     */
+    public abstract InstancesResult rerunInstances(String type, String entity, String start, String end,
+                                                   String filePath, String colo, String clusters,
+                                                   String sourceClusters, List<LifeCycle> lifeCycles, Boolean isForced,
+                                                   String doAsUser) throws FalconCLIException, IOException;
+
+    /**
+     * Get summary of instance/instances of an entity.
+     * @param type Valid options are cluster, feed or process.
+     * @param entity Name of the entity.
+     * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+     *                 By default, it is set to (end - (10 * entityFrequency)).
+     * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+     *               Default is set to now.
+     * @param colo <optional param> Colo on which the query should be run.
+     * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process
+     *                   is Execution(default).
+     * @param filterBy <optional param> Filter results by list of field:value pairs.
+     *                 Example1: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster
+     *                 Example2: filterBy=Status:RUNNING,Status:KILLED
+     *                 Supported filter fields are STATUS, CLUSTER.
+     *                 Query will do an AND among filterBy fields.
+     * @param orderBy <optional param> Field by which results should be ordered.
+     *                Supports ordering by "cluster". Example: orderBy=cluster
+     * @param sortOrder <optional param> Valid options are "asc" and "desc". Example: sortOrder=asc
+     * @param doAsUser proxy user
+     * @return Summary of the instances over the specified time range
+     */
+    public abstract InstancesSummaryResult getSummaryOfInstances(String type, String entity, String start, String end,
+                                                                 String colo, List<LifeCycle> lifeCycles,
+                                                                 String filterBy, String orderBy, String sortOrder,
+                                                                 String doAsUser) throws FalconCLIException;
+
+    /**
+     * Get falcon feed instance availability.
+     * @param type Valid options is feed.
+     * @param entity Name of the entity.
+     * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+     *              By default, it is set to (end - (10 * entityFrequency)).
+     * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+     *            Default is set to now.
+     * @param colo Colo on which the query should be run.
+     * @param doAsUser proxy user
+     * @return Feed instance availability status
+     */
+    public abstract FeedInstanceResult getFeedListing(String type, String entity, String start, String end, String colo,
+                                                      String doAsUser) throws FalconCLIException;
+
+    /**
+     * Get log of a specific instance of an entity.
+     * @param type Valid options are cluster, feed or process.
+     * @param entity Name of the entity.
+     * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+     *                 By default, it is set to (end - (10 * entityFrequency)).
+     * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'.
+     *               Default is set to now.
+     * @param colo <optional param> Colo on which the query should be run.
+     * @param runId <optional param> Run Id.
+     * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process is
+     *                   Execution(default).
+     * @param filterBy <optional param> Filter results by list of field:value pairs.
+     *                 Example: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster
+     *                 Supported filter fields are STATUS, CLUSTER, SOURCECLUSTER, STARTEDAFTER.
+     *                 Query will do an AND among filterBy fields.
+     * @param orderBy <optional param> Field by which results should be ordered.
+     *                Supports ordering by "status","startTime","endTime","cluster".
+     * @param sortOrder <optional param> Valid options are "asc" and "desc"
+     * @param offset <optional param> Show results from the offset, used for pagination. Defaults to 0.
+     * @param numResults <optional param> Number of results to show per request, used for pagination. Only integers > 0
+     *                   are valid, Default is 10.
+     * @param doAsUser proxy user
+     * @return Log of specified instance.
+     */
+    public abstract InstancesResult getLogsOfInstances(String type, String entity, String start, String end,
+                                                       String colo, String runId, List<LifeCycle> lifeCycles,
+                                                       String filterBy, String orderBy, String sortOrder,
+                                                       Integer offset, Integer numResults, String doAsUser) throws
+            FalconCLIException;
+
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    /**
+     * Get the params passed to the workflow for an instance of feed/process.
+     * @param type Valid options are cluster, feed or process.
+     * @param entity Name of the entity.
+     * @param start should be the nominal time of the instance for which you want the params to be returned
+     * @param colo <optional param> Colo on which the query should be run.
+     * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process is
+     *                   Execution(default).
+     * @param doAsUser proxy user
+     * @return List of instances currently running.
+     */
+    public abstract InstancesResult getParamsOfInstance(String type, String entity, String start, String colo,
+                                                        List<LifeCycle> lifeCycles, String doAsUser) throws
+            FalconCLIException, UnsupportedEncodingException;
+
+    /**
+     * Get dependent instances for a particular instance.
+     * @param entityType Valid options are feed or process.
+     * @param entityName Name of the entity
+     * @param instanceTime <mandatory param> time of the given instance
+     * @param colo Colo on which the query should be run.
+     * @return Dependent instances for the specified instance
+     */
+    public abstract InstanceDependencyResult getInstanceDependencies(String entityType, String entityName,
+                                                                     String instanceTime, String colo) throws
+            FalconCLIException;
+
+    protected InputStream getServletInputStream(String clusters, String sourceClusters, String properties) throws
+            FalconCLIException, UnsupportedEncodingException {
+
+        InputStream stream;
+        StringBuilder buffer = new StringBuilder();
+        if (clusters != null) {
+            buffer.append(FALCON_INSTANCE_ACTION_CLUSTERS).append('=').append(clusters).append('\n');
+        }
+        if (sourceClusters != null) {
+            buffer.append(FALCON_INSTANCE_SOURCE_CLUSTERS).append('=').append(sourceClusters).append('\n');
+        }
+        if (properties != null) {
+            buffer.append(properties);
+        }
+        stream = new ByteArrayInputStream(buffer.toString().getBytes());
+        return (buffer.length() == 0) ? null : stream;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 27510f6..c49dd08 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -58,7 +58,6 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
 import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -86,9 +85,6 @@ public class FalconClient extends AbstractFalconClient {
     public static final String USER = System.getProperty("user.name");
     public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER;
 
-    private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
-    private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
-
     /**
      * Name of the HTTP cookie used for the authentication token between the client and the server.
      */
@@ -670,25 +666,6 @@ public class FalconClient extends AbstractFalconClient {
         return stream;
     }
 
-    private InputStream getServletInputStream(String clusters,
-                                              String sourceClusters, String properties)
-        throws FalconCLIException, UnsupportedEncodingException {
-
-        InputStream stream;
-        StringBuilder buffer = new StringBuilder();
-        if (clusters != null) {
-            buffer.append(FALCON_INSTANCE_ACTION_CLUSTERS).append('=').append(clusters).append('\n');
-        }
-        if (sourceClusters != null) {
-            buffer.append(FALCON_INSTANCE_SOURCE_CLUSTERS).append('=').append(sourceClusters).append('\n');
-        }
-        if (properties != null) {
-            buffer.append(properties);
-        }
-        stream = new ByteArrayInputStream(buffer.toString().getBytes());
-        return (buffer.length() == 0) ? null : stream;
-    }
-
     private APIResult sendEntityRequest(Entities entities, EntityType entityType,
                                      String entityName, String colo, Boolean skipDryRun,
                                      String doAsUser, String properties) throws FalconCLIException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
new file mode 100644
index 0000000..093d6ff
--- /dev/null
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.client;
+
+import org.apache.oozie.BaseEngineException;
+import org.apache.oozie.CoordinatorEngine;
+import org.apache.oozie.LocalOozieClientCoord;
+
+/**
+ * Client API to submit and manage Oozie Coord jobs against an Oozie
+ * intance.
+ */
+public class LocalOozieClientCoordProxy extends LocalOozieClientCoord {
+
+    private final CoordinatorEngine coordEngine;
+
+    /**
+     * Create a coordinator client for Oozie local use.
+     * <p/>
+     *
+     * @param coordEngine the engine instance to use.
+     */
+    public LocalOozieClientCoordProxy(CoordinatorEngine coordEngine) {
+        super(coordEngine);
+        this.coordEngine = coordEngine;
+    }
+
+    /**
+     * Get the info of a coordinator job and subset actions.
+     *
+     * @param jobId job Id.
+     * @param filter filter the status filter
+     * @param start starting index in the list of actions belonging to the job
+     * @param len number of actions to be returned
+     * @return the job info.
+     * @throws OozieClientException thrown if the job info could not be retrieved.
+     */
+    @Override
+    public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException {
+        try {
+            return coordEngine.getCoordJob(jobId, filter, start, len, false);
+        } catch (BaseEngineException bex) {
+            throw new OozieClientException(bex.getErrorCode().toString(), bex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index c2100d1..6ae92de 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -18,10 +18,11 @@
 package org.apache.oozie.client;
 
 import org.apache.oozie.BundleEngine;
+import org.apache.oozie.CoordinatorEngine;
 import org.apache.oozie.LocalOozieClient;
-import org.apache.oozie.LocalOozieClientCoord;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.BundleEngineService;
+import org.apache.oozie.service.CoordinatorEngineService;
 import org.apache.oozie.service.Services;
 
 import java.io.PrintStream;
@@ -34,11 +35,12 @@ import java.util.Properties;
 public class LocalProxyOozieClient extends OozieClient {
 
     private static LocalOozieClientBundle localOozieClientBundle;
-    private static LocalOozieClientCoord localOozieClientCoord;
+    private static LocalOozieClientCoordProxy localOozieClientCoordProxy;
     private static LocalOozieClient localOozieClient;
     private static final BundleEngine BUNDLE_ENGINE = Services.get().
             get(BundleEngineService.class).getBundleEngine(System.getProperty("user.name"));
-
+    private static final CoordinatorEngine COORDINATOR_ENGINE = Services.get().get(CoordinatorEngineService.class).
+            getCoordinatorEngine(System.getProperty("user.name"));
 
     private LocalOozieClientBundle getLocalOozieClientBundle() {
         if (localOozieClientBundle == null) {
@@ -54,11 +56,11 @@ public class LocalProxyOozieClient extends OozieClient {
         return localOozieClient;
     }
 
-    private LocalOozieClientCoord getLocalOozieClientCoord() {
-        if (localOozieClientCoord == null) {
-            localOozieClientCoord = (LocalOozieClientCoord) LocalOozie.getCoordClient();
+    private LocalOozieClientCoordProxy getLocalOozieClientCoordProxy() {
+        if (localOozieClientCoordProxy == null) {
+            localOozieClientCoordProxy = new LocalOozieClientCoordProxy(COORDINATOR_ENGINE);
         }
-        return localOozieClientCoord;
+        return localOozieClientCoordProxy;
     }
 
     private OozieClient getClient(String jobId) {
@@ -66,7 +68,7 @@ public class LocalProxyOozieClient extends OozieClient {
             if (jobId.toUpperCase().endsWith("B")) { //checking if it's a bundle job
                 return getLocalOozieClientBundle();
             } else if (jobId.toUpperCase().endsWith("C")) { //checking if it's a coordinator job
-                return getLocalOozieClientCoord();
+                return getLocalOozieClientCoordProxy();
             } else if (jobId.toUpperCase().endsWith("W")) { //checking if it's a workflow job
                 return getLocalOozieClient();
             } else {
@@ -104,43 +106,43 @@ public class LocalProxyOozieClient extends OozieClient {
 
     @Override
     public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
-        return getLocalOozieClientCoord().getCoordActionInfo(actionId);
+        return getLocalOozieClientCoordProxy().getCoordActionInfo(actionId);
     }
 
 
     @Override
     public CoordinatorJob getCoordJobInfo(final String jobId) throws OozieClientException {
-        return getLocalOozieClientCoord().getCoordJobInfo(jobId);
+        return getLocalOozieClientCoordProxy().getCoordJobInfo(jobId);
     }
 
     @Override
     public List<CoordinatorJob> getCoordJobsInfo(final String filter, final int start,
                                                  final int len) throws OozieClientException {
-        return getLocalOozieClientCoord().getCoordJobsInfo(filter, start, len);
+        return getLocalOozieClientCoordProxy().getCoordJobsInfo(filter, start, len);
     }
 
     @Override
     public CoordinatorJob getCoordJobInfo(final String jobId, final String filter,
                                           final int start, final int len) throws OozieClientException {
-        return getLocalOozieClientCoord().getCoordJobInfo(jobId, filter, start, len);
+        return getLocalOozieClientCoordProxy().getCoordJobInfo(jobId, filter, start, len);
     }
 
     @Override
     public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType,
                                               final String scope, final boolean refresh,
                                               final boolean noCleanup) throws OozieClientException {
-        return getLocalOozieClientCoord().reRunCoord(jobId, rerunType, scope, refresh, noCleanup);
+        return getLocalOozieClientCoordProxy().reRunCoord(jobId, rerunType, scope, refresh, noCleanup);
     }
 
     @Override
     public List<WorkflowJob> getJobsInfo(final String filter) throws OozieClientException {
-        return getLocalOozieClientCoord().getJobsInfo(filter);
+        return getLocalOozieClientCoordProxy().getJobsInfo(filter);
     }
 
     @Override
     public List<WorkflowJob> getJobsInfo(final String filter, final int start,
                                          final int len) throws OozieClientException {
-        return getLocalOozieClientCoord().getJobsInfo(filter, start, len);
+        return getLocalOozieClientCoordProxy().getJobsInfo(filter, start, len);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 606f741..fea2989 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -555,10 +555,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
         }
     }
 
-    public InstancesResult killInstance(HttpServletRequest request,
-                                        String type, String entity, String startStr,
-                                        String endStr, String colo,
-                                        List<LifeCycle> lifeCycles) {
+    public InstancesResult killInstance(HttpServletRequest request, String type, String entity, String startStr,
+                                        String endStr, String colo, List<LifeCycle> lifeCycles) {
+        Properties props = getProperties(request);
+        return killInstance(props, type, entity, startStr, endStr, colo, lifeCycles);
+    }
+
+    public InstancesResult killInstance(Properties props, String type, String entity, String startStr,
+                                        String endStr, String colo, List<LifeCycle> lifeCycles) {
         checkColo(colo);
         checkType(type);
         try {
@@ -568,7 +572,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            Properties props = getProperties(request);
             AbstractWorkflowEngine wfEngine = getWorkflowEngine();
             return wfEngine.killInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
@@ -578,10 +581,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
         }
     }
 
-    public InstancesResult suspendInstance(HttpServletRequest request,
-                                           String type, String entity, String startStr,
-                                           String endStr, String colo,
-                                           List<LifeCycle> lifeCycles) {
+    public InstancesResult suspendInstance(HttpServletRequest request, String type, String entity, String startStr,
+                                           String endStr, String colo, List<LifeCycle> lifeCycles) {
+        Properties props = getProperties(request);
+        return suspendInstance(props, type, entity, startStr, endStr, colo, lifeCycles);
+    }
+
+    public InstancesResult suspendInstance(Properties props, String type, String entity, String startStr, String endStr,
+                                           String colo, List<LifeCycle> lifeCycles) {
         checkColo(colo);
         checkType(type);
         try {
@@ -591,7 +598,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            Properties props = getProperties(request);
             AbstractWorkflowEngine wfEngine = getWorkflowEngine();
             return wfEngine.suspendInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
@@ -601,10 +607,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
         }
     }
 
-    public InstancesResult resumeInstance(HttpServletRequest request,
-                                          String type, String entity, String startStr,
-                                          String endStr, String colo,
-                                          List<LifeCycle> lifeCycles) {
+    public InstancesResult resumeInstance(HttpServletRequest request, String type, String entity, String startStr,
+                                          String endStr, String colo, List<LifeCycle> lifeCycles) {
+        Properties props = getProperties(request);
+        return resumeInstance(props, type, entity, startStr, endStr, colo, lifeCycles);
+    }
+
+    public InstancesResult resumeInstance(Properties props, String type, String entity, String startStr, String endStr,
+                                          String colo, List<LifeCycle> lifeCycles) {
         checkColo(colo);
         checkType(type);
         try {
@@ -614,7 +624,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            Properties props = getProperties(request);
             AbstractWorkflowEngine wfEngine = getWorkflowEngine();
             return wfEngine.resumeInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
@@ -790,9 +799,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
         return null;
     }
 
+    public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr,
+                                         HttpServletRequest request, String colo, List<LifeCycle> lifeCycles,
+                                         Boolean isForced) {
+        Properties props = getProperties(request);
+        return reRunInstance(type, entity, startStr, endStr, props, colo, lifeCycles, isForced);
+    }
 
-    public InstancesResult reRunInstance(String type, String entity, String startStr,
-                                         String endStr, HttpServletRequest request,
+    public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr, Properties props,
                                          String colo, List<LifeCycle> lifeCycles, Boolean isForced) {
         checkColo(colo);
         checkType(type);
@@ -803,7 +817,6 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            Properties props = getProperties(request);
             AbstractWorkflowEngine wfEngine = getWorkflowEngine();
             return wfEngine.reRunInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles, isForced);
@@ -814,14 +827,18 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    private Properties getProperties(HttpServletRequest request) throws IOException {
+    private Properties getProperties(HttpServletRequest request) {
         Properties props = new Properties();
-        ServletInputStream xmlStream = request == null ? null : request.getInputStream();
-        if (xmlStream != null) {
-            if (xmlStream.markSupported()) {
-                xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len
+        try {
+            ServletInputStream xmlStream = request == null ? null : request.getInputStream();
+            if (xmlStream != null) {
+                if (xmlStream.markSupported()) {
+                    xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len
+                }
+                props.load(xmlStream);
             }
-            props.load(xmlStream);
+        } catch (IOException e) {
+            LOG.error("Failed to get properties from request", e);
         }
         return props;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index b5afae3..9f2b714 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -32,7 +32,10 @@ import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.util.DateUtil;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
@@ -40,9 +43,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Properties;
 import java.util.TimeZone;
 
 /**
@@ -51,6 +56,7 @@ import java.util.TimeZone;
 public class FalconUnitClient extends AbstractFalconClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class);
+    protected static final int XML_DEBUG_LEN = 10 * 1024;
 
     private static final String DEFAULT_ORDERBY = "status";
     private static final String DEFAULT_SORTED_ORDER = "asc";
@@ -162,7 +168,6 @@ public class FalconUnitClient extends AbstractFalconClient {
                 sortOrder, offset, numResults);
 
     }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
 
     /**
@@ -237,6 +242,68 @@ public class FalconUnitClient extends AbstractFalconClient {
         return localSchedulableEntityManager.getStatus(entityType.name(), entityName, colo);
     }
 
+    public InstancesResult killInstances(String type, String entity, String start, String end, String colo,
+                                         String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+                                         String doAsUser) throws FalconCLIException, UnsupportedEncodingException {
+        Properties props = getProperties(clusters, sourceClusters);
+        return localInstanceManager.killInstance(props, type, entity, start, end, colo, lifeCycles);
+    }
+
+    public InstancesResult suspendInstances(String type, String entity, String start, String end, String colo,
+                                            String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+                                            String doAsUser) throws FalconCLIException, UnsupportedEncodingException {
+        Properties props = getProperties(clusters, sourceClusters);
+        return localInstanceManager.suspendInstance(props, type, entity, start, end, colo, lifeCycles);
+    }
+
+    public InstancesResult resumeInstances(String type, String entity, String start, String end, String colo,
+                                           String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+                                           String doAsUser) throws FalconCLIException, UnsupportedEncodingException {
+        Properties props = getProperties(clusters, sourceClusters);
+        return localInstanceManager.resumeInstance(props, type, entity, start, end, colo, lifeCycles);
+    }
+
+    public InstancesResult rerunInstances(String type, String entity, String start, String end, String filePath,
+                                          String colo, String clusters, String sourceClusters,
+                                          List<LifeCycle> lifeCycles, Boolean isForced, String doAsUser) throws
+            FalconCLIException, IOException {
+        Properties props = getProperties(clusters, sourceClusters);
+        return localInstanceManager.reRunInstance(type, entity, start, end, props, colo, lifeCycles, isForced);
+    }
+
+    public InstancesSummaryResult getSummaryOfInstances(String type, String entity, String start, String end,
+                                                        String colo, List<LifeCycle> lifeCycles, String filterBy,
+                                                        String orderBy, String sortOrder, String doAsUser) throws
+            FalconCLIException {
+        return localInstanceManager.getSummary(type, entity, start, end, colo, lifeCycles, filterBy, orderBy,
+                sortOrder);
+    }
+
+    public FeedInstanceResult getFeedListing(String type, String entity, String start, String end, String colo,
+                                             String doAsUser) throws FalconCLIException {
+        return localInstanceManager.getListing(type, entity, start, end, colo);
+    }
+
+    public InstancesResult getLogsOfInstances(String type, String entity, String start, String end, String colo,
+                                              String runId, List<LifeCycle> lifeCycles, String filterBy,
+                                              String orderBy, String sortOrder, Integer offset, Integer numResults,
+                                              String doAsUser) throws FalconCLIException {
+        return localInstanceManager.getLogs(type, entity, start, end, colo, runId, lifeCycles, filterBy, orderBy,
+                sortOrder, offset, numResults);
+    }
+
+    public InstancesResult getParamsOfInstance(String type, String entity, String start, String colo,
+                                               List<LifeCycle> lifeCycles, String doAsUser) throws FalconCLIException,
+            UnsupportedEncodingException {
+        return localInstanceManager.getInstanceParams(type, entity, start, colo, lifeCycles);
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, String instanceTime,
+                                                            String colo) throws FalconCLIException {
+        return localInstanceManager.getInstanceDependencies(entityType, entityName, instanceTime, colo);
+    }
+
     private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) {
         if (entityType == EntityType.FEED) {
             return checkAndUpdateFeedClusters(entity, cluster);
@@ -303,5 +370,15 @@ public class FalconUnitClient extends AbstractFalconClient {
             }
         }
     }
-}
 
+    private Properties getProperties(String clusters, String sourceClusters) {
+        Properties props = new Properties();
+        if (StringUtils.isNotEmpty(clusters)) {
+            props.setProperty(FALCON_INSTANCE_ACTION_CLUSTERS, clusters);
+        }
+        if (StringUtils.isNotEmpty(sourceClusters)) {
+            props.setProperty(FALCON_INSTANCE_SOURCE_CLUSTERS, sourceClusters);
+        }
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java
index 1503b28..148cbf7 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java
@@ -19,9 +19,13 @@ package org.apache.falcon.unit;
 
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.resource.AbstractInstanceManager;
+import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
 
 import java.util.List;
+import java.util.Properties;
 
 /**
  * A proxy implementation of the entity instance operations.
@@ -31,6 +35,27 @@ public class LocalInstanceManager extends AbstractInstanceManager {
     public LocalInstanceManager() {}
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public InstancesResult killInstance(Properties properties, String type, String entity, String startStr,
+                                        String endStr, String colo, List<LifeCycle> lifeCycles) {
+        return super.killInstance(properties, type, entity, startStr, endStr, colo, lifeCycles);
+    }
+
+    public InstancesResult suspendInstance(Properties properties, String type, String entity, String startStr,
+                                           String endStr, String colo, List<LifeCycle> lifeCycles) {
+        return super.suspendInstance(properties, type, entity, startStr, endStr, colo, lifeCycles);
+    }
+
+    public InstancesResult resumeInstance(Properties properties, String type, String entity, String startStr,
+                                          String endStr, String colo, List<LifeCycle> lifeCycles) {
+        return super.resumeInstance(properties, type, entity, startStr, endStr, colo, lifeCycles);
+    }
+
+    public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr,
+                                         Properties properties, String colo, List<LifeCycle> lifeCycles,
+                                         Boolean isForced) {
+        return super.reRunInstance(type, entity, startStr, endStr, properties, colo, lifeCycles, isForced);
+    }
+
     public InstancesResult getStatusOfInstances(String type, String entity, String start, String end,
                                                 String colo, List<LifeCycle> lifeCycles, String filterBy,
                                                 String orderBy, String sortOrder, Integer offset,
@@ -38,6 +63,32 @@ public class LocalInstanceManager extends AbstractInstanceManager {
         return super.getStatus(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, sortOrder,
                 offset, numResults);
     }
+
+    public InstancesSummaryResult getSummary(String type, String entity, String startStr, String endStr, String colo,
+                                             List<LifeCycle> lifeCycles, String filterBy, String orderBy,
+                                             String sortOrder) {
+        return super.getSummary(type, entity, startStr, endStr, colo, lifeCycles, filterBy, orderBy, sortOrder);
+    }
+
+    public FeedInstanceResult getListing(String type, String entity, String startStr, String endStr, String colo) {
+        return super.getListing(type, entity, startStr, endStr, colo);
+    }
+
+    public InstancesResult getLogs(String type, String entity, String startStr, String endStr, String colo,
+                                   String runId, List<LifeCycle> lifeCycles, String filterBy, String orderBy,
+                                   String sortOrder, Integer offset, Integer numResults) {
+        return super.getLogs(type, entity, startStr, endStr, colo, runId, lifeCycles, filterBy, orderBy, sortOrder,
+                offset, numResults);
+    }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
+    public InstancesResult getInstanceParams(String type, String entity, String startTime, String colo,
+                                             List<LifeCycle> lifeCycles) {
+        return super.getInstanceParams(type, entity, startTime, colo, lifeCycles);
+    }
+
+    public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName,
+                                                            String instanceTimeString, String colo) {
+        return super.getInstanceDependencies(entityType, entityName, instanceTimeString, colo);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index ac478f4..bd81798 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -174,7 +174,7 @@ public class FalconUnitTestBase {
             throw new FalconException("Process not found " + processName);
         }
         String workflowPath = processEntity.getWorkflow().getPath();
-        fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath));
+        fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath, "workflow.xml"));
         return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster,
                 skipDryRun, properties);
     }
@@ -332,12 +332,13 @@ public class FalconUnitTestBase {
         }
     }
 
-    protected long waitForStatus(final String entityType, final String entityName, final String instanceTime) {
+    protected long waitForStatus(final String entityType, final String entityName, final String instanceTime,
+                                 final InstancesResult.WorkflowStatus instanceStatus) {
         return waitFor(WAIT_TIME, new Predicate() {
             public boolean evaluate() throws Exception {
                 InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType,
                         entityName, instanceTime);
-                return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status);
+                return instanceStatus.equals(status);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 8cdbd88..2c8642d 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -24,7 +24,10 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -54,7 +57,9 @@ public class TestFalconUnit extends FalconUnitTestBase {
     private static final String OUTPUT_FEED_NAME = "out";
     private static final String INPUT_FILE_NAME = "input.txt";
     private static final String SCHEDULE_TIME = "2015-06-20T00:00Z";
+    private static final String END_TIME = "2015-06-20T00:01Z";
     private static final String WORKFLOW = "workflow.xml";
+    private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml";
 
     @Test
     public void testProcessInstanceExecution() throws Exception {
@@ -66,7 +71,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
         result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
                 true, "");
         assertStatus(result);
-        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED);
         InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
                 PROCESS_NAME, SCHEDULE_TIME);
         Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED);
@@ -113,7 +118,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
         result = scheduleProcess(PROCESS_NAME, scheduleTime, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
                 true, "");
         assertStatus(result);
-        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime, InstancesResult.WorkflowStatus.SUCCEEDED);
         result = getClient().suspend(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
         assertStatus(result);
         result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
@@ -139,7 +144,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
         result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
                 true, "");
         assertStatus(result);
-        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED);
         result = getClient().delete(EntityType.PROCESS, PROCESS_NAME, null);
         assertStatus(result);
         try {
@@ -186,10 +191,9 @@ public class TestFalconUnit extends FalconUnitTestBase {
         createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
         result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
         assertStatus(result);
-        result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
-                true, "");
+        result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, "");
         assertStatus(result);
-        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED);
 
         Process process = getEntity(EntityType.PROCESS, PROCESS_NAME);
         setDummyProperty(process);
@@ -228,4 +232,101 @@ public class TestFalconUnit extends FalconUnitTestBase {
         process.getProperties().getProperties().add(property);
 
     }
+
+    @Test
+    public void testProcessInstanceManagementAPI1() throws Exception {
+        submitClusterAndFeeds();
+        // submitting and scheduling process
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+        APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+        assertStatus(result);
+        result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 3, CLUSTER_NAME, getAbsolutePath(SLEEP_WORKFLOW), true,
+                "");
+        assertStatus(result);
+        InstancesResult.WorkflowStatus currentStatus;
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING);
+        currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING);
+
+        getClient().suspendInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null,
+                CLUSTER_NAME, null, null, null);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUSPENDED);
+        currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.SUSPENDED);
+
+        getClient().resumeInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null,
+                CLUSTER_NAME, null, null, null);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING);
+        currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING);
+
+        getClient().killInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, CLUSTER_NAME,
+                null, null, null);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.KILLED);
+        currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.KILLED);
+
+        getClient().rerunInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, null,
+                CLUSTER_NAME, null, null, true, null);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.RUNNING);
+        currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.RUNNING);
+    }
+
+    @Test
+    public void testProcessInstanceManagementAPI2() throws Exception {
+        submitClusterAndFeeds();
+        // submitting and scheduling process
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+        APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+        assertStatus(result);
+        result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 3, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, "");
+        assertStatus(result);
+        InstancesResult.WorkflowStatus currentStatus;
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.SUCCEEDED);
+        currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.SUCCEEDED);
+
+        InstancesSummaryResult summaryResult = getClient().getSummaryOfInstances(EntityType.PROCESS.name(),
+                PROCESS_NAME, SCHEDULE_TIME, END_TIME, null, null, null, "asc", null, null);
+        Assert.assertEquals(summaryResult.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertNotNull(summaryResult.getInstancesSummary());
+        Assert.assertEquals(summaryResult.getInstancesSummary().length, 1);
+
+
+        InstancesResult instancesResult = getClient().getLogsOfInstances(EntityType.PROCESS.name(), PROCESS_NAME,
+                SCHEDULE_TIME, END_TIME, null, "0", null, null, "asc", null, new Integer(0), new Integer(1), null);
+        Assert.assertEquals(instancesResult.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertNotNull(instancesResult.getInstances());
+        Assert.assertEquals(instancesResult.getInstances().length, 1);
+
+        instancesResult = getClient().getParamsOfInstance(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, null,
+                null, null);
+        Assert.assertEquals(instancesResult.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertNotNull(instancesResult.getInstances());
+        Assert.assertEquals(instancesResult.getInstances().length, 1);
+
+        InstanceDependencyResult dependencyResult = getClient().getInstanceDependencies(EntityType.PROCESS.name(),
+                PROCESS_NAME, SCHEDULE_TIME, null);
+        Assert.assertEquals(dependencyResult.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertNotNull(dependencyResult.getDependencies());
+        Assert.assertEquals(dependencyResult.getDependencies().length, 2); //2 for input and output feed
+    }
+
+    @Test
+    public void testFeedInstanceManagementAPI() throws Exception {
+        // submit with default props
+        submitCluster();
+        // submitting feeds
+        APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED));
+        assertStatus(result);
+        createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+        String inPath = getFeedPathForTS(CLUSTER_NAME, INPUT_FEED_NAME, SCHEDULE_TIME);
+        Assert.assertTrue(fs.exists(new Path(inPath)));
+        result = schedule(EntityType.FEED, INPUT_FEED_NAME, CLUSTER_NAME);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        FeedInstanceResult feedInstanceResult = getClient().getFeedListing(EntityType.FEED.name(), INPUT_FEED_NAME,
+                SCHEDULE_TIME, END_TIME, null, null);
+        Assert.assertEquals(feedInstanceResult.getStatus(), APIResult.Status.SUCCEEDED);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java b/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java
new file mode 100644
index 0000000..4f4f827
--- /dev/null
+++ b/unit/src/test/java/org/apache/falcon/unit/examples/JavaSleepExample.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit.examples;
+
+/**
+ * Java Sleep Example.
+ */
+public final class JavaSleepExample {
+
+    private JavaSleepExample() {}
+
+    public static void main(String[] args) throws InterruptedException {
+        long start = System.currentTimeMillis();
+        Thread.sleep(40000);
+        System.out.println("Sleep time in ms = " + (System.currentTimeMillis()-start));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/49fbc8c9/unit/src/test/resources/sleepWorkflow.xml
----------------------------------------------------------------------
diff --git a/unit/src/test/resources/sleepWorkflow.xml b/unit/src/test/resources/sleepWorkflow.xml
new file mode 100644
index 0000000..2f6598f
--- /dev/null
+++ b/unit/src/test/resources/sleepWorkflow.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf">
+    <start to="java-node"/>
+    <action name="java-node">
+        <java>
+            <job-tracker>local</job-tracker>
+            <name-node>jail://global:00</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>default</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.unit.examples.JavaSleepExample</main-class>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>
\ No newline at end of file


Mime
View raw message