falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1577 Migration of EntityManagerJerseyIT to use falcon unit (Narayan Periwal)
Date Tue, 15 Dec 2015 08:37:08 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 4445798be -> 2804c5d1a


FALCON-1577 Migration of EntityManagerJerseyIT to use falcon unit (Narayan Periwal)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2804c5d1
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2804c5d1
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2804c5d1

Branch: refs/heads/master
Commit: 2804c5d1a8be9d19c0198c66b45373bbfb9dc0e0
Parents: 4445798
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Dec 15 14:06:33 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Dec 15 14:06:33 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/falcon/unit/FalconUnitClient.java    |  19 +-
 .../apache/falcon/unit/FalconUnitTestBase.java  |   2 +-
 .../falcon/resource/EntityManagerJerseyIT.java  | 945 +++++++++----------
 .../resource/ProcessInstanceManagerIT.java      |  52 +-
 .../apache/falcon/resource/UnitTestContext.java |  46 +-
 .../org/apache/falcon/util/OozieTestUtils.java  |  54 ++
 7 files changed, 602 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b594160..5753535 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,8 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1577 Migration of EntityManagerJerseyIT to use falcon unit (Narayan Periwal via Pallavi Rao)
+
     FALCON-1658 MySql Support for Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava)
 
     FALCON-1656 Improve FeedHelper:getRetentionFrequency method(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 9eb4277..f34a90c 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -43,6 +43,7 @@ import org.apache.falcon.util.DateUtil;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,9 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
 
@@ -354,14 +357,16 @@ public class FalconUnitClient extends AbstractFalconClient {
     public String getVersion(String doAsUser) throws FalconCLIException {
         AdminResource resource = new AdminResource();
         AdminResource.PropertyList propertyList = resource.getVersion();
-        StringBuilder properties = new StringBuilder();
-        for(AdminResource.Property property : propertyList.properties) {
-            if (properties.length() > 1) {
-                properties.append(",");
-            }
-            properties.append(property.key).append(":").append(property.value);
+        Map<String, String> version = new LinkedHashMap<>();
+        List<String> list = new ArrayList<>();
+        for (AdminResource.Property property : propertyList.properties) {
+            Map<String, String> map = new LinkedHashMap<>();
+            map.put("key", property.key);
+            map.put("value", property.value);
+            list.add(JSONValue.toJSONString(map));
         }
-        return properties.toString();
+        version.put("properties", list.toString());
+        return version.toString();
     }
 
     private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 2a73516..83afac7 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -94,7 +94,7 @@ public class FalconUnitTestBase {
     private static final String DEFAULT_COLO = "local";
     private static final String CLUSTER = "cluster";
     private static final String COLO = "colo";
-    private static final String CLUSTER_TEMPLATE = "/local-cluster-template.xml";
+    protected static final String CLUSTER_TEMPLATE = "/local-cluster-template.xml";
     protected static final String STAGING_PATH = "/projects/falcon/staging";
     protected static final String WORKING_PATH = "/projects/falcon/working";
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index abe5bdd..439d148 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -18,11 +18,12 @@
 package org.apache.falcon.resource;
 
 import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.falcon.cli.FalconCLI;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
@@ -32,10 +33,14 @@ import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.unit.FalconUnit;
+import org.apache.falcon.unit.FalconUnitTestBase;
 import org.apache.falcon.util.BuildProperties;
 import org.apache.falcon.util.DeploymentProperties;
 import org.apache.falcon.util.FalconTestUtil;
 import org.apache.falcon.util.OozieTestUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
@@ -46,19 +51,13 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.OozieClient;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import javax.servlet.ServletInputStream;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.JAXBException;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringReader;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -81,32 +80,46 @@ import java.util.regex.Pattern;
  * Tests should be enabled only in local environments as they need running instance of the web server.
  */
 @Test(groups = {"exhaustive"})
-public class EntityManagerJerseyIT {
+public class EntityManagerJerseyIT extends FalconUnitTestBase {
+
+    private static final String START_INSTANCE = "2012-04-20T00:00Z";
+    private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml";
+    private static final String LOCAL_MODE = "local";
+    private static final String IT_RUN_MODE = "it.run.mode";
 
     @BeforeClass
-    public void prepare() throws Exception {
-        TestContext.prepare();
+    @Override
+    public void setup() throws FalconException, IOException {
+        String version = System.getProperty("project.version");
+        String buildDir = System.getProperty("project.build.directory");
+        System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-" + version + ".jar");
+        super.setup();
     }
 
-    @AfterClass
-    public void tearDown() throws Exception {
-        TestContext.deleteEntitiesFromStore();
+    @AfterMethod
+    @Override
+    public void cleanUpActionXml() throws IOException, FalconException {
+        //Needed since oozie writes action xml to current directory.
+        FileUtils.deleteQuietly(new File("action.xml"));
+        FileUtils.deleteQuietly(new File(".action.xml.crc"));
+        contexts.remove();
+    }
+
+    private ThreadLocal<UnitTestContext> contexts = new ThreadLocal<UnitTestContext>();
+
+    private UnitTestContext newContext() throws FalconException, IOException {
+        contexts.set(new UnitTestContext());
+        return contexts.get();
     }
 
     static void assertLibs(FileSystem fs, Path path) throws IOException {
         FileStatus[] libs = fs.listStatus(path);
         Assert.assertNotNull(libs);
-        Assert.assertEquals(libs.length, 1);
-        Assert.assertTrue(libs[0].getPath().getName().startsWith("falcon-hadoop-dependencies"));
     }
 
-    private Entity getDefinition(TestContext context, EntityType type, String name) throws Exception {
-        ClientResponse response =
-                context.service.path("api/entities/definition/" + type.name().toLowerCase() + "/" + name)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
-        return (Entity) type.getUnmarshaller().unmarshal(new StringReader(response.getEntity(String.class)));
+    private Entity getDefinition(EntityType type, String name) throws Exception {
+        Entity entity = falconUnitClient.getDefinition(type.name(), name, null);
+        return entity;
     }
 
     private void updateEndtime(Process process) {
@@ -114,17 +127,61 @@ public class EntityManagerJerseyIT {
         processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
     }
 
+    private void submitCluster(UnitTestContext context) throws IOException, FalconCLIException {
+        String mode = System.getProperty(IT_RUN_MODE);
+        if (StringUtils.isNotEmpty(mode) && mode.toLowerCase().equals(LOCAL_MODE)) {
+            submitCluster(context.colo, context.clusterName, null);
+        } else {
+            String tmpFile = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, context.overlay);
+            submit(EntityType.CLUSTER, tmpFile);
+        }
+    }
+
+    private APIResult submitFeed(String template, Map<String, String> overlay) throws IOException, FalconCLIException {
+        String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay);
+        APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
+        return result;
+    }
+
+    private void submitProcess(String template, Map<String, String> overlay) throws IOException, FalconCLIException {
+        String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay);
+        APIResult result = falconUnitClient.submit(EntityType.PROCESS.name(), tmpFile, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
+    private void scheduleProcess(UnitTestContext context) throws FalconCLIException, IOException, FalconException {
+        String scheduleTime = START_INSTANCE;
+        APIResult result = scheduleProcess(context.getProcessName(), scheduleTime, 1, context.getClusterName(),
+                getAbsolutePath(SLEEP_WORKFLOW), true, "");
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
+    private void schedule(UnitTestContext context) throws Exception {
+        submitCluster(context);
+        context.prepare();
+        submitFeeds(context.overlay);
+        submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
+        scheduleProcess(context);
+    }
+
+    private void submitFeeds(Map<String, String> overlay) throws IOException, FalconCLIException {
+        String tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, overlay);
+        APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE2, overlay);
+        result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
     @Test
     public void testLibExtensions() throws Exception {
-        TestContext context = newContext();
-        Map<String, String> overlay = context.getUniqueOverlay();
-        ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
-        FileSystem fs = context.getCluster().getFileSystem();
+        UnitTestContext context = newContext();
+        submitCluster(context);
+        FileSystem fs = FalconUnit.getFileSystem();
         assertLibs(fs, new Path("/projects/falcon/working/libext/FEED/retention"));
         assertLibs(fs, new Path("/projects/falcon/working/libext/PROCESS"));
 
-        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, context.overlay);
         Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(new File(tmpFileName));
         Location location = new Location();
         location.setPath("fsext://global:00/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}");
@@ -133,63 +190,44 @@ public class EntityManagerJerseyIT {
         cluster.setLocations(new Locations());
         feed.getClusters().getClusters().get(0).getLocations().getLocations().add(location);
 
-        File tmpFile = TestContext.getTempFile();
+        File tmpFile = UnitTestContext.getTempFile();
         EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
-        response = context.submitAndSchedule(tmpFileName, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
-    }
 
-    private ClientResponse update(TestContext context, Entity entity,
-                                  Date endTime, Boolean skipDryRun) throws Exception {
-        File tmpFile = TestContext.getTempFile();
-        entity.getEntityType().getMarshaller().marshal(entity, tmpFile);
-        WebResource resource = context.service.path("api/entities/update/"
-                + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
-        if (endTime != null) {
-            resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(endTime));
-        }
-        if (null != skipDryRun) {
-            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
-        }
-        return resource.header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
-    }
-
-    private ClientResponse touch(TestContext context, Entity entity, Boolean skipDryRun) {
-        WebResource resource = context.service.path("api/entities/touch/"
-                + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
-        if (null != skipDryRun) {
-            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
-        }
-        return resource
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
+        APIResult result = falconUnitClient.submitAndSchedule(EntityType.FEED.name(), tmpFile.getAbsolutePath(), true,
+                null, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
     }
 
     @Test
     public void testUpdateCheckUser() throws Exception {
-        TestContext context = newContext();
-        Map<String, String> overlay = context.getUniqueOverlay();
-        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        UnitTestContext context = newContext();
+        String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.PROCESS_TEMPLATE,
+                context.overlay);
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
         updateEndtime(process);
-        File tmpFile = TestContext.getTempFile();
+        File tmpFile = UnitTestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
-        OozieTestUtils.waitForBundleStart(context, Status.RUNNING);
+        submitCluster(context);
+        context.prepare();
+        submitFeeds(context.overlay);
+        submitProcess(tmpFile.getAbsolutePath(), context.overlay);
+        scheduleProcess(context.getProcessName(), context.getClusterName(), getAbsolutePath(SLEEP_WORKFLOW));
+        waitForStatus(EntityType.PROCESS.name(), context.getProcessName(), START_INSTANCE,
+                InstancesResult.WorkflowStatus.RUNNING);
 
         List<BundleJob> bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 1);
         Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER);
 
-        Feed feed = (Feed) getDefinition(context, EntityType.FEED, context.outputFeedName);
+        Feed feed = (Feed) getDefinition(EntityType.FEED, context.outputFeedName);
 
         //change output feed path and update feed as another user
         feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
-        ClientResponse response = update(context, feed, null, false);
-        context.assertSuccessful(response);
+        tmpFile = TestContext.getTempFile();
+        feed.getEntityType().getMarshaller().marshal(feed, tmpFile);
+        APIResult result = falconUnitClient.update(EntityType.FEED.name(), feed.getName(),
+                tmpFile.getAbsolutePath(), true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
         bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 2);
@@ -197,27 +235,9 @@ public class EntityManagerJerseyIT {
         Assert.assertEquals(bundles.get(1).getUser(), TestContext.REMOTE_USER);
     }
 
-    private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>();
-
-    private TestContext newContext() {
-        contexts.set(new TestContext());
-        return contexts.get();
-    }
-
-    @AfterMethod
-    public void cleanup() throws Exception {
-        TestContext testContext = contexts.get();
-        if (testContext != null) {
-            OozieTestUtils.killOozieJobs(testContext);
-        }
-
-        contexts.remove();
-    }
-
     public void testOptionalInput() throws Exception {
-        TestContext context = newContext();
-        Map<String, String> overlay = context.getUniqueOverlay();
-        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        UnitTestContext context = newContext();
+        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, context.overlay);
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
 
         Input in1 = process.getInputs().getInputs().get(0);
@@ -232,14 +252,14 @@ public class EntityManagerJerseyIT {
 
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
+        schedule(context);
     }
 
     public void testDryRun() throws Exception {
         //Schedule of invalid process should fail because of dryRun, and should pass when dryrun is skipped
-        TestContext context = newContext();
-        Map<String, String> overlay = context.getUniqueOverlay();
-        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        UnitTestContext context = newContext();
+        String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.PROCESS_TEMPLATE,
+                context.overlay);
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
         Property prop = new Property();
         prop.setName("newProp");
@@ -248,10 +268,14 @@ public class EntityManagerJerseyIT {
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
 
-        ClientResponse response = context.validate(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS);
-        context.assertFailure(response);
-
-        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false);
+        try {
+            falconUnitClient.validate(EntityType.PROCESS.name(), tmpFile.getAbsolutePath(),
+                    true, null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
+        schedule(context);
 
         //Fix the process and then submitAndSchedule should succeed
         Iterator<Property> itr = process.getProperties().getProperties().iterator();
@@ -264,37 +288,43 @@ public class EntityManagerJerseyIT {
         tmpFile = TestContext.getTempFile();
         process.setName("process" + System.currentTimeMillis());
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        response = context.submitAndSchedule(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS);
-        context.assertSuccessful(response);
-
-        //Update with invalid property should fail again
-        process.getProperties().getProperties().add(prop);
-        updateEndtime(process);
-        response = update(context, process, null, null);
-        context.assertFailure(response);
+        APIResult result = falconUnitClient.submitAndSchedule(EntityType.PROCESS.name(),
+                tmpFile.getAbsolutePath(), true, null, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
         // update where dryrun is disabled should succeed.
-        response = update(context, process, null, true);
-        context.assertSuccessful(response);
-
+        tmpFile = TestContext.getTempFile();
+        process.getEntityType().getMarshaller().marshal(process, tmpFile);
+        result = falconUnitClient.update(EntityType.PROCESS.name(), process.getName(),
+                tmpFile.getAbsolutePath(), true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
     }
 
     @Test
     public void testUpdateSuspendedEntity() throws Exception {
-        TestContext context = newContext();
-        context.scheduleProcess();
-        OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
+        UnitTestContext context = newContext();
+        schedule(context);
+        waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
+                InstancesResult.WorkflowStatus.RUNNING);
 
         //Suspend entity
-        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
-        ClientResponse response = suspend(context, process);
-        context.assertSuccessful(response);
+        Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
+        APIResult result = falconUnitClient.suspend(EntityType.PROCESS, process.getName(), context.colo,
+                null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName,
+                null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertEquals(result.getMessage(), "SUSPENDED");
 
         process.getProperties().getProperties().get(0).setName("newprop");
         Date endTime = getEndTime();
         process.getClusters().getClusters().get(0).getValidity().setEnd(endTime);
-        response = update(context, process, endTime, null);
-        context.assertSuccessful(response);
+        File tmpFile = TestContext.getTempFile();
+        process.getEntityType().getMarshaller().marshal(process, tmpFile);
+        result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
+                tmpFile.getAbsolutePath(), true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
         //Since the process endtime = update effective time, it shouldn't create new bundle
         List<BundleJob> bundles = OozieTestUtils.getBundles(context);
@@ -306,24 +336,26 @@ public class EntityManagerJerseyIT {
 
     @Test
     public void testProcessInputUpdate() throws Exception {
-        TestContext context = newContext();
-        context.scheduleProcess();
-        OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
+        UnitTestContext context = newContext();
+
+        schedule(context);
+        waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
+                InstancesResult.WorkflowStatus.RUNNING);
         List<BundleJob> bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 1);
-        OozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster());
+        OozieClient ozClient = OozieTestUtils.getOozieClient(context);
         String bundle = bundles.get(0).getId();
         String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId();
 
-        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+        Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
 
         String feed3 = "f3" + System.currentTimeMillis();
         Map<String, String> overlay = new HashMap<String, String>();
         overlay.put("inputFeedName", feed3);
         overlay.put("cluster", context.clusterName);
         overlay.put("user", System.getProperty("user.name"));
-        ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        submitFeed(UnitTestContext.FEED_TEMPLATE1, overlay);
+
 
         Input input = new Input();
         input.setFeed(feed3);
@@ -334,8 +366,11 @@ public class EntityManagerJerseyIT {
 
         updateEndtime(process);
         Date endTime = getEndTime();
-        response = update(context, process, endTime, null);
-        context.assertSuccessful(response);
+        File tmpFile = TestContext.getTempFile();
+        process.getEntityType().getMarshaller().marshal(process, tmpFile);
+        APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
+                tmpFile.getAbsolutePath(), true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
         //Assert that update creates new bundle and old coord is running
         bundles = OozieTestUtils.getBundles(context);
@@ -360,16 +395,20 @@ public class EntityManagerJerseyIT {
         Assert.assertEquals(coord.getStartTime(), endTime);
     }
 
+    @Test
     public void testProcessEndtimeUpdate() throws Exception {
-        TestContext context = newContext();
-        context.scheduleProcess();
+        UnitTestContext context = newContext();
+        schedule(context);
         OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
 
-        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+        Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
 
         updateEndtime(process);
-        ClientResponse response = update(context, process, null, null);
-        context.assertSuccessful(response);
+        File tmpFile = TestContext.getTempFile();
+        process.getEntityType().getMarshaller().marshal(process, tmpFile);
+        APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
+                tmpFile.getAbsolutePath(), true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
         //Assert that update does not create new bundle
         List<BundleJob> bundles = OozieTestUtils.getBundles(context);
@@ -378,26 +417,28 @@ public class EntityManagerJerseyIT {
 
     @Test
     public void testTouchEntity() throws Exception {
-        TestContext context = newContext();
-        context.scheduleProcess();
+        UnitTestContext context = newContext();
+        schedule(context);
         OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
         List<BundleJob> bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 1);
-        OozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster());
+        OozieClient ozClient = OozieTestUtils.getOozieClient(context);
         String bundle = bundles.get(0).getId();
         String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId();
 
         //Update end time of process required for touch
-        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+        Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
         updateEndtime(process);
-        ClientResponse response = update(context, process, null, null);
-        context.assertSuccessful(response);
+        File tmpFile = TestContext.getTempFile();
+        process.getEntityType().getMarshaller().marshal(process, tmpFile);
+        APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
+                tmpFile.getAbsolutePath(), true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
         bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 1);
 
-        //Calling force update
-        response = touch(context, process, true);
-        context.assertSuccessful(response);
+        result = falconUnitClient.touch(EntityType.PROCESS.name(), context.getProcessName(), context.colo, true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
         OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING);
 
         //Assert that touch creates new bundle and old coord is running
@@ -421,101 +462,72 @@ public class EntityManagerJerseyIT {
     }
 
     public void testStatus() throws Exception {
-        TestContext context = newContext();
-        ClientResponse response;
-        Map<String, String> overlay = context.getUniqueOverlay();
-
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        UnitTestContext context = newContext();
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        submitCluster(context);
 
-        response = context.service
-                .path("api/entities/status/feed/" + overlay.get("inputFeedName"))
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
-
-        APIResult result = (APIResult) context.unmarshaller.
-                unmarshal(new StringReader(response.getEntity(String.class)));
-        Assert.assertTrue(result.getMessage().contains("SUBMITTED"));
+        context.prepare();
+        submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
 
+        APIResult result = falconUnitClient.getStatus(EntityType.FEED, context.overlay.get("inputFeedName"),
+                context.colo, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertEquals(result.getMessage(), "SUBMITTED");
     }
 
     public void testIdempotentSubmit() throws Exception {
-        TestContext context = newContext();
-        ClientResponse response;
-        Map<String, String> overlay = context.getUniqueOverlay();
+        UnitTestContext context = newContext();
 
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        submitCluster(context);
 
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        submitCluster(context);
     }
 
-    public void testNotFoundStatus() {
-        TestContext context = newContext();
-        ClientResponse response;
+    public void testNotFoundStatus() throws FalconException, IOException, FalconCLIException {
         String feed1 = "f1" + System.currentTimeMillis();
-        response = context.service
-                .path("api/entities/status/feed/" + feed1)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_PLAIN)
-                .get(ClientResponse.class);
-
-        Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
+        try {
+            falconUnitClient.getStatus(EntityType.FEED, feed1, null, null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
     }
 
-    public void testVersion() {
-        TestContext context = newContext();
-        ClientResponse response;
-        response = context.service
-                .path("api/admin/version")
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.APPLICATION_JSON)
-                .get(ClientResponse.class);
-        String json = response.getEntity(String.class);
+    public void testVersion() throws FalconException, IOException, FalconCLIException {
+        String json = falconUnitClient.getVersion(null);
         String buildVersion = BuildProperties.get().getProperty("build.version");
         String deployMode = DeploymentProperties.get().getProperty("deploy.mode");
         Assert.assertTrue(Pattern.matches(
-                ".*\\{\\s*\"key\"\\s*:\\s*\"Version\"\\s*,\\s*\"value\"\\s*:\\s*\""
-                        + buildVersion + "\"\\s*}.*", json),
+                        ".*\\{\\s*\"key\"\\s*:\\s*\"Version\"\\s*,\\s*\"value\"\\s*:\\s*\""
+                                + buildVersion + "\"\\s*}.*", json),
                 "No build.version found in /api/admin/version");
         Assert.assertTrue(Pattern.matches(
-                ".*\\{\\s*\"key\"\\s*:\\s*\"Mode\"\\s*,\\s*\"value\"\\s*:\\s*\""
-                        + deployMode + "\"\\s*}.*", json),
+                        ".*\\{\\s*\"key\"\\s*:\\s*\"Mode\"\\s*,\\s*\"value\"\\s*:\\s*\""
+                                + deployMode + "\"\\s*}.*", json),
                 "No deploy.mode found in /api/admin/version");
     }
 
-    public void testValidate() {
-        TestContext context = newContext();
-        ServletInputStream stream = context.getServletInputStream(getClass().
-                getResourceAsStream(TestContext.SAMPLE_PROCESS_XML));
-
-        ClientResponse clientResponse = context.service
-                .path("api/entities/validate/process")
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class, stream);
-
-        context.assertFailure(clientResponse);
+    public void testValidate() throws FalconException, IOException {
+        UnitTestContext context = newContext();
+        try {
+            falconUnitClient.validate(EntityType.PROCESS.name(),
+                    UnitTestContext.class.getResource(UnitTestContext.SAMPLE_PROCESS_XML).getPath(), true, null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
     }
 
     public void testClusterValidate() throws Exception {
-        TestContext context = newContext();
-        ClientResponse clientResponse;
-        Map<String, String> overlay = context.getUniqueOverlay();
-
-        InputStream stream = context.getServletInputStream(
-                TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay));
-
-        clientResponse = context.service.path("api/entities/validate/cluster")
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class, stream);
-        context.assertSuccessful(clientResponse);
+        UnitTestContext context = newContext();
+
+        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, context.overlay);
+        File tmpFile = new File(tmpFileName);
+        fs.mkdirs(new Path(STAGING_PATH), HadoopClientFactory.ALL_PERMISSION);
+        fs.mkdirs(new Path(WORKING_PATH), HadoopClientFactory.READ_EXECUTE_PERMISSION);
+        APIResult result = falconUnitClient.validate(EntityType.CLUSTER.name(), tmpFile.getAbsolutePath(),
+                true, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
     }
 
     ClientResponse suspend(TestContext context, Entity entity) {
@@ -531,106 +543,91 @@ public class EntityManagerJerseyIT {
     }
 
     public void testClusterSubmitScheduleSuspendResumeDelete() throws Exception {
-        TestContext context = newContext();
-        ClientResponse clientResponse;
-        Map<String, String> overlay = context.getUniqueOverlay();
-
-        clientResponse = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay,
-                EntityType.CLUSTER);
-        context.assertSuccessful(clientResponse);
-
-        clientResponse = context.service
-                .path("api/entities/schedule/cluster/" + context.clusterName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        context.assertFailure(clientResponse);
-
-        clientResponse = suspend(context, EntityType.CLUSTER, context.clusterName);
-        context.assertFailure(clientResponse);
-
-        clientResponse = context.service
-                .path("api/entities/resume/cluster/" + context.clusterName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        context.assertFailure(clientResponse);
-
-        clientResponse = context.service
-                .path("api/entities/delete/cluster/" + context.clusterName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertSuccessful(clientResponse);
+        UnitTestContext context = newContext();
+
+        submitCluster(context);
+
+        try {
+            falconUnitClient.schedule(EntityType.CLUSTER, context.clusterName, null, true, null, null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
+
+        try {
+            falconUnitClient.suspend(EntityType.CLUSTER, context.clusterName, context.colo, null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
+
+        try {
+            falconUnitClient.resume(EntityType.CLUSTER, context.clusterName, context.colo, null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
+
+        APIResult result = falconUnitClient.delete(EntityType.CLUSTER, context.getClusterName(), null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
     }
 
     public void testSubmit() throws Exception {
-        TestContext context = newContext();
+        UnitTestContext context = newContext();
         ClientResponse response;
-        Map<String, String> overlay = context.getUniqueOverlay();
 
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        submitCluster(context);
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        submitFeed(UnitTestContext.FEED_TEMPLATE2, context.overlay);
 
-        response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
-        context.assertSuccessful(response);
+        submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
     }
 
     @Test
     public void testDuplicateSubmitCommands() throws Exception {
-        TestContext context = newContext();
-        Map<String, String> overlay = context.getUniqueOverlay();
+        UnitTestContext context = newContext();
 
-        context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        submitCluster(context);
 
         ExecutorService service = Executors.newSingleThreadExecutor();
         ExecutorService duplicateService = Executors.newSingleThreadExecutor();
 
-        Future<ClientResponse> future = service.submit(new SubmitCommand(context, overlay));
-        Future<ClientResponse> duplicateFuture = duplicateService.submit(new SubmitCommand(context, overlay));
-
-        ClientResponse response = future.get();
-        ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get();
+        Future<APIResult> future = service.submit(new SubmitCommand(context, context.overlay));
+        Future<APIResult> duplicateFuture = duplicateService.submit(new SubmitCommand(context, context.overlay));
 
         // since there are duplicate threads for submits, there is no guarantee which request will succeed.
-        testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse);
+        try {
+            APIResult response = future.get();
+            APIResult duplicateSubmitThreadResponse = duplicateFuture.get();
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
     }
 
     @Test
     public void testDuplicateDeleteCommands() throws Exception {
-        TestContext context = newContext();
-        Map<String, String> overlay = context.getUniqueOverlay();
-        context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        UnitTestContext context = newContext();
+        Map<String, String> overlay = context.overlay;
+        submitCluster(context);
+        submitFeed(UnitTestContext.FEED_TEMPLATE1, overlay);
 
         ExecutorService service = Executors.newFixedThreadPool(2);
 
-        Future<ClientResponse> future = service.submit(new DeleteCommand(context, overlay.get("inputFeedName"),
+        Future<APIResult> future = service.submit(new DeleteCommand(context, overlay.get("inputFeedName"),
                 "feed"));
-        Future<ClientResponse> duplicateFuture = service.submit(new DeleteCommand(context,
+        Future<APIResult> duplicateFuture = service.submit(new DeleteCommand(context,
                 overlay.get("inputFeedName"), "feed"));
 
-        ClientResponse response = future.get();
-        ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get();
-
-        // since there are two threads for deletion, there is no guarantee which request will succeed.
-        testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse);
-    }
-
-    private void testDuplicateCommandsResponse(TestContext context, ClientResponse response,
-                                               ClientResponse duplicateSubmitThreadResponse) {
-        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
-            context.assertSuccessful(response);
-            context.assertFailure(duplicateSubmitThreadResponse);
-        } else {
-            context.assertFailure(response);
-            context.assertSuccessful(duplicateSubmitThreadResponse);
+        // since there are duplicate threads for submits, there is no guarantee which request will succeed.
+        try {
+            APIResult response = future.get();
+            APIResult duplicateSubmitThreadResponse = duplicateFuture.get();
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
         }
     }
 
@@ -643,105 +640,144 @@ public class EntityManagerJerseyIT {
     }
 
     private void scheduleAndDeleteProcess(boolean withDoAs) throws Exception {
-        TestContext context = newContext();
+        UnitTestContext context = newContext();
+        submitCluster(context);
+        context.prepare();
+        submitFeeds(context.overlay);
         ClientResponse clientResponse;
-        Map<String, String> overlay = context.getUniqueOverlay();
-        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, context.overlay);
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
         updateEndtime(process);
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+        submitProcess(tmpFile.getAbsolutePath(), context.overlay);
         if (withDoAs) {
-            context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, FalconTestUtil.TEST_USER_2, null);
+            falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), context.getClusterName(), false,
+                    FalconTestUtil.TEST_USER_2, null);
         } else {
-            context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "", "key1:value1");
+            falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), context.getClusterName(), false, "",
+                    "key1:value1");
         }
         OozieTestUtils.waitForBundleStart(context, Status.RUNNING);
 
-        WebResource resource = context.service.path("api/entities/delete/process/" + context.processName);
-
+        APIResult result;
         if (withDoAs) {
-            resource = resource.queryParam(FalconCLI.DO_AS_OPT, FalconTestUtil.TEST_USER_2);
+            result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), FalconTestUtil.TEST_USER_2);
+        } else {
+            result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null);
         }
-
-        //Delete a scheduled process
-        clientResponse = resource
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertSuccessful(clientResponse);
+        assertStatus(result);
     }
 
     public void testGetEntityDefinition() throws Exception {
-        TestContext context = newContext();
-        ClientResponse response;
-        Map<String, String> overlay = context.getUniqueOverlay();
+        UnitTestContext context = newContext();
 
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        submitCluster(context);
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        context.prepare();
+        APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
-        response = context.service
-                .path("api/entities/definition/feed/" + overlay.get("inputFeedName"))
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
+        Feed feed = (Feed) falconUnitClient.getDefinition(EntityType.FEED.name(),
+                context.overlay.get("inputFeedName"), null);
+        Assert.assertEquals(feed.getName(), context.overlay.get("inputFeedName"));
+    }
 
-        String feedXML = response.getEntity(String.class);
+    public void testInvalidGetEntityDefinition() throws FalconException, IOException, FalconCLIException {
         try {
-            Feed result = (Feed) context.unmarshaller.
-                    unmarshal(new StringReader(feedXML));
-            Assert.assertEquals(result.getName(), overlay.get("inputFeedName"));
-        } catch (JAXBException e) {
-            Assert.fail("Reponse " + feedXML + " is not valid", e);
+            falconUnitClient.getDefinition(EntityType.PROCESS.name(), "sample1", null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
         }
     }
 
-    public void testInvalidGetEntityDefinition() {
-        TestContext context = newContext();
-        ClientResponse clientResponse = context.service
-                .path("api/entities/definition/process/sample1")
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
-        context.assertFailure(clientResponse);
-    }
-
     public void testScheduleSuspendResume() throws Exception {
-        TestContext context = newContext();
-        context.scheduleProcess();
-
-        ClientResponse clientResponse = suspend(context, EntityType.PROCESS, context.processName);
-        context.assertSuccessful(clientResponse);
-
-        clientResponse = context.service
-                .path("api/entities/resume/process/" + context.processName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        context.assertSuccessful(clientResponse);
+        UnitTestContext context = newContext();
+
+        schedule(context);
+        waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
+                InstancesResult.WorkflowStatus.RUNNING);
+
+        APIResult result = falconUnitClient.suspend(EntityType.PROCESS, context.getProcessName(),
+                context.colo, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName,
+                null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertEquals(result.getMessage(), "SUSPENDED");
+
+        result = falconUnitClient.resume(EntityType.PROCESS, context.getProcessName(),
+                context.colo, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName,
+                null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        Assert.assertEquals(result.getMessage(), "RUNNING");
     }
 
     public void testFeedSchedule() throws Exception {
-        TestContext context = newContext();
-        ClientResponse response;
-        Map<String, String> overlay = context.getUniqueOverlay();
+        UnitTestContext context = newContext();
 
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        submitCluster(context);
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        context.prepare();
+        APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
-        createTestData(context);
-        ClientResponse clientResponse = context.service
-                .path("api/entities/schedule/feed/" + overlay.get("inputFeedName"))
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        context.assertSuccessful(clientResponse);
+        createTestData();
+        result = falconUnitClient.schedule(EntityType.FEED, context.overlay.get("inputFeedName"), null, true, null,
+                null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
+    static List<Path> createTestData() throws Exception {
+        List<Path> list = new ArrayList<Path>();
+        fs.mkdirs(new Path("/user/guest"));
+        fs.setOwner(new Path("/user/guest"), TestContext.REMOTE_USER, "users");
+
+        DateFormat formatter = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
+        formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date date = new Date(System.currentTimeMillis() + 3 * 3600000);
+        Path path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        new FsShell(new Configuration()).run(new String[] {
+            "-chown", "-R", "guest:users", "/examples/input-data/rawLogs", });
+        return list;
     }
 
     static List<Path> createTestData(TestContext context) throws Exception {
@@ -795,135 +831,87 @@ public class EntityManagerJerseyIT {
     }
 
     public void testDeleteDataSet() throws Exception {
-        TestContext context = newContext();
-        ClientResponse response;
-        Map<String, String> overlay = context.getUniqueOverlay();
+        UnitTestContext context = newContext();
 
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        submitCluster(context);
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        context.prepare();
+        APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
-        response = context.service
-                .path("api/entities/delete/feed/" + overlay.get("inputFeedName"))
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertSuccessful(response);
+        result = falconUnitClient.delete(EntityType.FEED, context.overlay.get("inputFeedName"), null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
     }
 
     public void testDelete() throws Exception {
-        TestContext context = newContext();
-        ClientResponse response;
-        Map<String, String> overlay = context.getUniqueOverlay();
+        UnitTestContext context = newContext();
 
-        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
+        submitCluster(context);
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        context.prepare();
+        APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
-        response = context.service
-                .path("api/entities/delete/cluster/" + context.clusterName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertFailure(response);
+        try {
+            falconUnitClient.delete(EntityType.CLUSTER, context.getClusterName(), null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
 
-        response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        result = submitFeed(UnitTestContext.FEED_TEMPLATE2, context.overlay);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
-        response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
-        context.assertSuccessful(response);
+        submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
 
         //Delete a referred feed
-        response = context.service
-                .path("api/entities/delete/feed/" + overlay.get("inputFeedName"))
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertFailure(response);
+        try {
+            falconUnitClient.delete(EntityType.FEED, context.overlay.get("inputFeedName"), null);
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
 
         //Delete a submitted process
-        response = context.service
-                .path("api/entities/delete/process/" + context.processName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertSuccessful(response);
-
-        response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
-        context.assertSuccessful(response);
-
-        ClientResponse clientResponse = context.service
-                .path("api/entities/schedule/process/" + context.processName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        context.assertSuccessful(clientResponse);
+        result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+
+        submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
+
+        result = falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), null, true, null,
+                null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
         //Delete a scheduled process
-        response = context.service
-                .path("api/entities/delete/process/" + context.processName)
-                .header("Cookie", context.getAuthenticationToken())
-                .accept(MediaType.TEXT_XML)
-                .delete(ClientResponse.class);
-        context.assertSuccessful(response);
+        result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
     }
 
     @Test
     public void testGetEntityList() throws Exception {
-        TestContext context = newContext();
-        ClientResponse response;
-        response = context.service
-                .path("api/entities/list/process/")
-                .header("Cookie", context.getAuthenticationToken())
-                .type(MediaType.TEXT_XML)
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
-        Assert.assertEquals(response.getStatus(), 200);
-
-        EntityList result = response.getEntity(EntityList.class);
+        EntityList result = falconUnitClient.getEntityList(EntityType.PROCESS.name(), "", "", null, null,
+                null, null, null, new Integer(0), new Integer(1), null);
         Assert.assertNotNull(result);
         for (EntityList.EntityElement entityElement : result.getElements()) {
             Assert.assertNull(entityElement.status); // status is null
         }
 
-        response = context.service
-                .path("api/entities/list/cluster/")
-                .header("Cookie", context.getAuthenticationToken())
-                .type(MediaType.TEXT_XML)
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
-        Assert.assertEquals(response.getStatus(), 200);
-        result = response.getEntity(EntityList.class);
+        result = falconUnitClient.getEntityList(EntityType.CLUSTER.name(), "", "", null, null,
+                null, null, null, new Integer(0), new Integer(1), null);
         Assert.assertNotNull(result);
         for (EntityList.EntityElement entityElement : result.getElements()) {
             Assert.assertNull(entityElement.status); // status is null
         }
 
-        response = context.service
-                .path("api/entities/list/feed,process/")
-                .header("Cookie", context.getAuthenticationToken())
-                .type(MediaType.TEXT_XML)
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
-        Assert.assertEquals(response.getStatus(), 200);
-        result = response.getEntity(EntityList.class);
+        result = falconUnitClient.getEntityList(EntityType.FEED.name() + "," + EntityType.PROCESS.name(),
+                "", "", null, null, null, null, null, new Integer(0), new Integer(1), null);
         Assert.assertNotNull(result);
         for (EntityList.EntityElement entityElement : result.getElements()) {
             Assert.assertNull(entityElement.status); // status is null
         }
 
-        response = context.service
-                .path("api/entities/list/")
-                .header("Cookie", context.getAuthenticationToken())
-                .type(MediaType.TEXT_XML)
-                .accept(MediaType.TEXT_XML)
-                .get(ClientResponse.class);
-        Assert.assertEquals(response.getStatus(), 200);
-        result = response.getEntity(EntityList.class);
+        result = falconUnitClient.getEntityList(null, "", "", null, null, null, null, null, new Integer(0),
+                new Integer(1), null);
         Assert.assertNotNull(result);
         for (EntityList.EntityElement entityElement : result.getElements()) {
             Assert.assertNull(entityElement.status); // status is null
@@ -932,21 +920,21 @@ public class EntityManagerJerseyIT {
 
     @Test
     public void testDuplicateUpdateCommands() throws Exception {
-        TestContext context = newContext();
-        context.scheduleProcess();
+        UnitTestContext context = newContext();
+        schedule(context);
         OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
         List<BundleJob> bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 1);
 
-        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+        Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
 
         String feed3 = "f3" + System.currentTimeMillis();
         Map<String, String> overlay = new HashMap<String, String>();
         overlay.put("inputFeedName", feed3);
         overlay.put("cluster", context.clusterName);
         overlay.put("user", System.getProperty("user.name"));
-        ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
-        context.assertSuccessful(response);
+        APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
 
         Input input = new Input();
         input.setFeed(feed3);
@@ -958,13 +946,19 @@ public class EntityManagerJerseyIT {
         updateEndtime(process);
         Date endTime = getEndTime();
         ExecutorService service =  Executors.newSingleThreadExecutor();
-        Future<ClientResponse> future = service.submit(new UpdateCommand(context, process, endTime));
-        response = update(context, process, endTime, false);
-        ClientResponse duplicateUpdateThreadResponse = future.get();
+        ExecutorService duplicateService = Executors.newSingleThreadExecutor();
 
-        // since there are duplicate threads for updates, there is no guarantee which request will succeed
-        testDuplicateCommandsResponse(context, response, duplicateUpdateThreadResponse);
+        Future<APIResult> future = service.submit(new UpdateCommand(context, process, endTime));
+        Future<APIResult> duplicateFuture = duplicateService.submit(new UpdateCommand(context, process, endTime));
 
+        // since there are duplicate threads for updates, there is no guarantee which request will succeed
+        try {
+            future.get();
+            duplicateFuture.get();
+            Assert.fail("Exception should be Thrown");
+        } catch (Exception e) {
+            //ignore
+        }
     }
 
     public Date getEndTime() {
@@ -977,38 +971,38 @@ public class EntityManagerJerseyIT {
         return cal.getTime();
     }
 
-    class UpdateCommand implements Callable<ClientResponse> {
-        private TestContext context;
+    class UpdateCommand implements Callable<APIResult> {
+        private UnitTestContext context;
         private Process process;
         private Date endTime;
 
-        public TestContext getContext() {
+        public UnitTestContext getContext() {
             return context;
         }
         public Process getProcess() {
             return process;
         }
-        public Date getEndTime() {
-            return endTime;
-        }
 
-        public UpdateCommand(TestContext context, Process process, Date endTime) {
+        public UpdateCommand(UnitTestContext context, Process process, Date endTime) {
             this.context = context;
             this.process = process;
             this.endTime = endTime;
         }
 
         @Override
-        public ClientResponse call() throws Exception {
-            return update(context, process, endTime, false);
+        public APIResult call() throws Exception {
+            File tmpFile = TestContext.getTempFile();
+            process.getEntityType().getMarshaller().marshal(process, tmpFile);
+            return falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
+                    tmpFile.getAbsolutePath(), true, null);
         }
     }
 
-    class SubmitCommand implements Callable<ClientResponse> {
+    class SubmitCommand implements Callable<APIResult> {
         private Map<String, String> overlay;
-        private TestContext context;
+        private UnitTestContext context;
 
-        public TestContext getContext() {
+        public UnitTestContext getContext() {
             return context;
         }
 
@@ -1016,36 +1010,35 @@ public class EntityManagerJerseyIT {
             return overlay;
         }
 
-        public SubmitCommand(TestContext context, Map<String, String> overlay) {
+        public SubmitCommand(UnitTestContext context, Map<String, String> overlay) {
             this.context = context;
             this.overlay = overlay;
         }
 
         @Override
-        public ClientResponse call() throws Exception {
-            return context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        public APIResult call() throws Exception {
+            return submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
         }
     }
 
-    class DeleteCommand implements Callable<ClientResponse> {
-        private TestContext context;
+    class DeleteCommand implements Callable<APIResult> {
+        private UnitTestContext context;
         private String entityName;
         private String entityType;
 
-        public TestContext getContext() {
+        public UnitTestContext getContext() {
             return context;
         }
 
-        public DeleteCommand(TestContext context, String entityName, String entityType) {
+        public DeleteCommand(UnitTestContext context, String entityName, String entityType) {
             this.context = context;
             this.entityName = entityName;
             this.entityType = entityType;
         }
 
         @Override
-        public ClientResponse call() throws Exception {
-            return context.deleteFromFalcon(entityName, entityType);
+        public APIResult call() throws Exception {
+            return falconUnitClient.delete(EntityType.valueOf(entityType), entityName, null);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
index 6458b59..769d059 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.resource;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.resource.InstancesResult.Instance;
 import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
@@ -34,6 +35,7 @@ import org.testng.annotations.Test;
 import javax.ws.rs.core.MediaType;
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Test class for Process Instance REST API.
@@ -41,6 +43,7 @@ import java.io.IOException;
 public class ProcessInstanceManagerIT extends FalconUnitTestBase {
 
     private static final String START_INSTANCE = "2012-04-20T00:00Z";
+    private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml";
 
     @BeforeClass
     @Override
@@ -59,6 +62,37 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
         FileUtils.deleteQuietly(new File(".action.xml.crc"));
     }
 
+    private void submitFeeds(Map<String, String> overlay) throws IOException, FalconCLIException {
+        String tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, overlay);
+        APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE2, overlay);
+        result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
+
+    private void submitProcess(String template, Map<String, String> overlay) throws IOException, FalconCLIException {
+        String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay);
+        APIResult result = falconUnitClient.submit(EntityType.PROCESS.name(), tmpFile, null);
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
+    private void schedule(UnitTestContext context) throws FalconCLIException, IOException, FalconException {
+        String scheduleTime = START_INSTANCE;
+        APIResult result = scheduleProcess(context.getProcessName(), scheduleTime, 1, context.getClusterName(),
+                getAbsolutePath(SLEEP_WORKFLOW), true, "");
+        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
+    private void scheduleProcess(UnitTestContext context) throws Exception {
+        submitCluster(context.colo, context.clusterName, null);
+        context.prepare();
+        submitFeeds(context.overlay);
+        submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
+        schedule(context);
+    }
+
     protected void schedule(TestContext context) throws Exception {
         CurrentUser.authenticate(System.getProperty("user.name"));
         schedule(context, 1);
@@ -120,8 +154,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
     @Test
     public void testGetInstanceStatus() throws Exception {
         UnitTestContext context = new UnitTestContext();
-        submitCluster(context.colo, context.clusterName, null);
-        context.scheduleProcess();
+        scheduleProcess(context);
         waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING);
         String endTime = "2012-04-20T00:01Z";
         InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(),
@@ -135,8 +168,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
     @Test
     public void testGetInstanceStatusPagination() throws Exception {
         UnitTestContext context = new UnitTestContext();
-        submitCluster(context.colo, context.clusterName, null);
-        context.scheduleProcess();
+        scheduleProcess(context);
         waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING);
         String endTime = "2012-04-20T00:02Z";
         InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(),
@@ -151,8 +183,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
     @Test
     public void testKillInstances() throws Exception {
         UnitTestContext context = new UnitTestContext();
-        submitCluster(context.colo, context.clusterName, null);
-        context.scheduleProcess();
+        scheduleProcess(context);
         waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING);
         String endTime = "2012-04-20T00:01Z";
         context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime,
@@ -178,8 +209,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
     @Test
     public void testReRunInstances() throws Exception {
         UnitTestContext context = new UnitTestContext();
-        submitCluster(context.colo, context.clusterName, null);
-        context.scheduleProcess();
+        scheduleProcess(context);
         waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING);
         String endTime = "2012-04-20T00:01Z";
         context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime,
@@ -208,8 +238,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
     @Test
     public void testSuspendInstances() throws Exception {
         UnitTestContext context = new UnitTestContext();
-        submitCluster(context.colo, context.clusterName, null);
-        context.scheduleProcess();
+        scheduleProcess(context);
         waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING);
         String endTime = "2012-04-20T00:01Z";
         context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
@@ -228,8 +257,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
     @Test
     public void testResumesInstances() throws Exception {
         UnitTestContext context = new UnitTestContext();
-        submitCluster(context.colo, context.clusterName, null);
-        context.scheduleProcess();
+        scheduleProcess(context);
         waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING);
         String endTime = "2012-04-20T00:01Z";
         context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE,

http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
index 1d49353..b222305 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
@@ -20,7 +20,6 @@ package org.apache.falcon.resource;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.unit.FalconUnit;
 import org.apache.falcon.unit.FalconUnitClient;
@@ -28,8 +27,8 @@ import org.apache.falcon.util.DeploymentUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.testng.Assert;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
@@ -43,6 +42,7 @@ public class UnitTestContext {
     public static final String FEED_TEMPLATE1 = "/feed-template1.xml";
     public static final String FEED_TEMPLATE2 = "/feed-template2.xml";
     public static final String PROCESS_TEMPLATE = "/process-template.xml";
+    public static final String SAMPLE_PROCESS_XML = "/process-version-0.xml";
 
     protected String colo;
     protected String clusterName;
@@ -54,7 +54,7 @@ public class UnitTestContext {
     private static FalconUnitClient client;
     private static FileSystem fs;
     protected static ConfigurationStore configStore;
-    private Map<String, String> overlay;
+    protected Map<String, String> overlay;
 
     public UnitTestContext() throws FalconException, IOException {
         client = FalconUnit.getClient();
@@ -63,6 +63,14 @@ public class UnitTestContext {
         overlay = getUniqueOverlay();
     }
 
+    public String getProcessName() {
+        return processName;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
     public static FalconUnitClient getClient() {
         return client;
     }
@@ -79,7 +87,7 @@ public class UnitTestContext {
         }
     }
 
-    private void prepare() throws Exception {
+    protected void prepare() throws Exception {
         mkdir(fs, new Path("/falcon"), new FsPermission((short) 511));
 
         Path wfParent = new Path("/falcon/test");
@@ -96,28 +104,22 @@ public class UnitTestContext {
         mkdir(fs, outPath, new FsPermission((short) 511));
     }
 
-    public void scheduleProcess() throws Exception {
-        scheduleProcess(PROCESS_TEMPLATE, overlay);
+    public static File getTempFile() throws IOException {
+        return getTempFile("test", ".xml");
     }
 
-    public void scheduleProcess(String processTemplate, Map<String, String> uniqueOverlay) throws
-            Exception {
-        prepare();
-
-        String tmpFile = TestContext.overlayParametersOverTemplate(FEED_TEMPLATE1, uniqueOverlay);
-        APIResult result = client.submit(EntityType.FEED.name(), tmpFile, null);
-        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
-
-        tmpFile = TestContext.overlayParametersOverTemplate(FEED_TEMPLATE2, uniqueOverlay);
-        result = client.submit(EntityType.FEED.name(), tmpFile, null);
-        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    public static File getTempFile(String prefix, String suffix) throws IOException {
+        return getTempFile("target", prefix, suffix);
+    }
 
-        tmpFile = TestContext.overlayParametersOverTemplate(processTemplate, uniqueOverlay);
-        result = client.submit(EntityType.PROCESS.name(), tmpFile, null);
-        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static File getTempFile(String path, String prefix, String suffix) throws IOException {
+        File f = new File(path);
+        if (!f.exists()) {
+            f.mkdirs();
+        }
 
-        result = client.schedule(EntityType.PROCESS, processName, clusterName, true, null, null);
-        Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
+        return File.createTempFile(prefix, suffix, f);
     }
 
     public Map<String, String> getUniqueOverlay() throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
index 804b2ed..056c0a1 100644
--- a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
+++ b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.logging.JobLogMover;
 import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.resource.UnitTestContext;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.engine.OozieClientFactory;
 import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
@@ -54,6 +55,10 @@ public final class OozieTestUtils {
         return getOozieClient(context.getCluster().getCluster());
     }
 
+    public static OozieClient getOozieClient(UnitTestContext context) throws FalconException {
+        return OozieClientFactory.get(context.getClusterName());
+    }
+
     public static OozieClient getOozieClient(Cluster cluster) throws FalconException {
         return OozieClientFactory.get(cluster);
     }
@@ -68,6 +73,16 @@ public final class OozieTestUtils {
         return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + context.getProcessName(), 0, 10);
     }
 
+    public static List<BundleJob> getBundles(UnitTestContext context) throws Exception {
+        List<BundleJob> bundles = new ArrayList<BundleJob>();
+        if (context.getClusterName() == null) {
+            return bundles;
+        }
+
+        OozieClient ozClient = OozieClientFactory.get(context.getClusterName());
+        return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + context.getProcessName(), 0, 10);
+    }
+
     public static boolean killOozieJobs(TestContext context) throws Exception {
         if (context.getCluster() == null) {
             return true;
@@ -133,6 +148,15 @@ public final class OozieTestUtils {
         waitForBundleStart(context, bundles.get(0).getId(), status);
     }
 
+    public static void waitForBundleStart(UnitTestContext context, Job.Status... status) throws Exception {
+        List<BundleJob> bundles = getBundles(context);
+        if (bundles.isEmpty()) {
+            return;
+        }
+
+        waitForBundleStart(context, bundles.get(0).getId(), status);
+    }
+
     public static void waitForBundleStart(TestContext context, String bundleId, Job.Status... status) throws Exception {
         OozieClient ozClient = getOozieClient(context);
         Set<Job.Status> statuses = new HashSet<Job.Status>(Arrays.asList(status));
@@ -162,6 +186,36 @@ public final class OozieTestUtils {
         throw new Exception("Bundle " + bundleId + " is not " + statuses + ". Last seen status " + bundleStatus);
     }
 
+    public static void waitForBundleStart(UnitTestContext context, String bundleId, Job.Status... status) throws
+            Exception {
+        OozieClient ozClient = getOozieClient(context);
+        Set<Job.Status> statuses = new HashSet<Job.Status>(Arrays.asList(status));
+
+        Status bundleStatus = null;
+        for (int i = 0; i < 15; i++) {
+            Thread.sleep(i * 1000);
+            BundleJob bundle = ozClient.getBundleJobInfo(bundleId);
+            bundleStatus = bundle.getStatus();
+            if (statuses.contains(bundleStatus)) {
+                if (statuses.contains(Job.Status.FAILED) || statuses.contains(Job.Status.KILLED)) {
+                    return;
+                }
+
+                boolean done = false;
+                for (CoordinatorJob coord : bundle.getCoordinators()) {
+                    if (statuses.contains(coord.getStatus())) {
+                        done = true;
+                    }
+                }
+                if (done) {
+                    return;
+                }
+            }
+            System.out.println("Waiting for bundle " + bundleId + " in " + statuses + " state");
+        }
+        throw new Exception("Bundle " + bundleId + " is not " + statuses + ". Last seen status " + bundleStatus);
+    }
+
     public static WorkflowJob getWorkflowJob(Cluster cluster, String filter) throws Exception {
         OozieClient ozClient = getOozieClient(cluster);
 


Mime
View raw message