falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [2/8] FALCON-1 Create packaging and scripts to install and try Apache Falcon. Contributed by Srikanth Sundarrajan
Date Wed, 10 Jul 2013 20:05:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
new file mode 100644
index 0000000..b94cff0
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.resource;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.DeploymentProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.Job.Status;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.servlet.ServletInputStream;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.JAXBException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * Test class for Entity REST APIs.
+ *
+ * Tests should be enabled only in local environments as they need running instance of the web server.
+ */
+public class EntityManagerJerseyIT {
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        TestContext.prepare();
+    }
+
+    @Test
+    public void testUpdateCheckUser() throws Exception {
+        TestContext context = newContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String tmpFileName = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
+        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+        processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
+        File tmpFile = context.getTempFile();
+        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
+        context.waitForBundleStart(Status.RUNNING);
+
+        List<BundleJob> bundles = context.getBundles();
+        Assert.assertEquals(bundles.size(), 1);
+        Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER);
+
+        ClientResponse response = context.service.path("api/entities/definition/feed/"
+                + context.outputFeedName).header(
+                "Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+        Feed feed = (Feed) EntityType.FEED.getUnmarshaller()
+                .unmarshal(new StringReader(response.getEntity(String.class)));
+
+        //change output feed path and update feed as another user
+        feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
+        tmpFile = context.getTempFile();
+        EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
+        response = context.service.path("api/entities/update/feed/"
+                + context.outputFeedName).header("Remote-User",
+                "testuser").accept(MediaType.TEXT_XML)
+                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+        context.assertSuccessful(response);
+
+        bundles = context.getBundles();
+        Assert.assertEquals(bundles.size(), 2);
+        Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER);
+        Assert.assertEquals(bundles.get(1).getUser(), TestContext.REMOTE_USER);
+    }
+
+    private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>();
+
+    private TestContext newContext() {
+        contexts.set(new TestContext());
+        return contexts.get();
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        TestContext testContext = contexts.get();
+        if (testContext != null) {
+            testContext.killOozieJobs();
+        }
+        contexts.remove();
+    }
+
+    @Test(enabled = false)
+    public void testOptionalInput() throws Exception {
+        TestContext context = newContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String tmpFileName = context.
+                overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
+
+        Input in1 = process.getInputs().getInputs().get(0);
+        Input in2 = new Input();
+        in2.setFeed(in1.getFeed());
+        in2.setName("input2");
+        in2.setOptional(true);
+        in2.setPartition(in1.getPartition());
+        in2.setStart("now(-1,0)");
+        in2.setEnd("now(0,0)");
+        process.getInputs().getInputs().add(in2);
+
+        File tmpFile = context.getTempFile();
+        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
+        context.waitForWorkflowStart(context.processName);
+    }
+
+    @Test
+    public void testProcessDeleteAndSchedule() throws Exception {
+        //Submit process with invalid property so that coord submit fails and bundle goes to failed state
+        TestContext context = newContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String tmpFileName = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
+        Property prop = new Property();
+        prop.setName("newProp");
+        prop.setValue("${formatTim()}");
+        process.getProperties().getProperties().add(prop);
+        File tmpFile = context.getTempFile();
+        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
+        context.waitForBundleStart(Status.FAILED);
+
+        //Delete and re-submit the process with correct workflow
+        ClientResponse clientRepsonse = context.service.path("api/entities/delete/process/"
+                + context.processName).header(
+                "Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+        context.assertSuccessful(clientRepsonse);
+        process.getWorkflow().setPath("/falcon/test/workflow");
+        tmpFile = context.getTempFile();
+        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+        clientRepsonse = context.service.path("api/entities/submitAndSchedule/process").
+                header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+        context.assertSuccessful(clientRepsonse);
+
+        //Assert that new schedule creates new bundle
+        List<BundleJob> bundles = context.getBundles();
+        Assert.assertEquals(bundles.size(), 2);
+    }
+
+    @Test
+    public void testProcessInputUpdate() throws Exception {
+        TestContext context = newContext();
+        context.scheduleProcess();
+        context.waitForBundleStart(Job.Status.RUNNING);
+
+        ClientResponse response = context.service.path("api/entities/definition/process/"
+                + context.processName).header(
+                "Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller()
+                .unmarshal(new StringReader(response.getEntity(String.class)));
+
+        String feed3 = "f3" + System.currentTimeMillis();
+        Map<String, String> overlay = new HashMap<String, String>();
+        overlay.put("inputFeedName", feed3);
+        overlay.put("cluster", context.clusterName);
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        Input input = new Input();
+        input.setFeed(feed3);
+        input.setName("inputData2");
+        input.setStart("today(20,0)");
+        input.setEnd("today(20,20)");
+        process.getInputs().getInputs().add(input);
+
+        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+        processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
+        File tmpFile = context.getTempFile();
+        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+        response = context.service.path("api/entities/update/process/"
+                + context.processName).header("Remote-User",
+                TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
+                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+        context.assertSuccessful(response);
+
+        //Assert that update creates new bundle
+        List<BundleJob> bundles = context.getBundles();
+        Assert.assertEquals(bundles.size(), 2);
+    }
+
+    @Test
+    public void testProcessEndtimeUpdate() throws Exception {
+        TestContext context = newContext();
+        context.scheduleProcess();
+        context.waitForBundleStart(Job.Status.RUNNING);
+
+        ClientResponse response = context.service.path("api/entities/definition/process/"
+                + context.processName).header(
+                "Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller()
+                .unmarshal(new StringReader(response.getEntity(String.class)));
+
+        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+        processValidity.setEnd(new Date(new Date().getTime() + 60 * 60 * 1000));
+        File tmpFile = context.getTempFile();
+        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+        response = context.service.path("api/entities/update/process/" + context.processName).header("Remote-User",
+                TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
+                .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+        context.assertSuccessful(response);
+
+        //Assert that update does not create new bundle
+        List<BundleJob> bundles = context.getBundles();
+        Assert.assertEquals(bundles.size(), 1);
+    }
+
+    @Test
+    public void testStatus() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.service
+                .path("api/entities/status/feed/" + overlay.get("inputFeedName"))
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+
+        APIResult result = (APIResult) context.unmarshaller.
+                unmarshal(new StringReader(response.getEntity(String.class)));
+        Assert.assertTrue(result.getMessage().contains("SUBMITTED"));
+
+    }
+
+    @Test
+    public void testIdempotentSubmit() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+    }
+
+    @Test
+    public void testNotFoundStatus() {
+        TestContext context = newContext();
+        ClientResponse response;
+        String feed1 = "f1" + System.currentTimeMillis();
+        response = context.service
+                .path("api/entities/status/feed/" + feed1)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
+        String status = response.getEntity(String.class);
+        Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
+    }
+
+    @Test
+    public void testVersion() {
+        TestContext context = newContext();
+        ClientResponse response;
+        response = context.service
+                .path("api/admin/version")
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
+        String status = response.getEntity(String.class);
+        Assert.assertEquals(status, "{Version:\""
+                + BuildProperties.get().getProperty("build.version") + "\",Mode:\""
+                + DeploymentProperties.get().getProperty("deploy.mode") + "\"}");
+    }
+
+    @Test
+    public void testValidate() {
+        TestContext context = newContext();
+        ServletInputStream stream = context.getServletInputStream(getClass().
+                getResourceAsStream(TestContext.SAMPLE_PROCESS_XML));
+
+        ClientResponse clientRepsonse = context.service
+                .path("api/entities/validate/process")
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class, stream);
+
+        context.assertFailure(clientRepsonse);
+    }
+
+    @Test
+    public void testClusterValidate() throws Exception {
+        TestContext context = newContext();
+        ClientResponse clientRepsonse;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        InputStream stream = context.getServletInputStream(
+                context.overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay));
+
+        clientRepsonse = context.service.path("api/entities/validate/cluster")
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .post(ClientResponse.class, stream);
+        context.assertSuccessful(clientRepsonse);
+    }
+
+    @Test
+    public void testClusterSubmitScheduleSuspendResumeDelete() throws Exception {
+        TestContext context = newContext();
+        ClientResponse clientRepsonse;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        clientRepsonse = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay,
+                EntityType.CLUSTER);
+        context.assertSuccessful(clientRepsonse);
+
+        clientRepsonse = context.service
+                .path("api/entities/schedule/cluster/" + context.clusterName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class);
+        context.assertFailure(clientRepsonse);
+
+        clientRepsonse = context.service
+                .path("api/entities/suspend/cluster/" + context.clusterName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class);
+        context.assertFailure(clientRepsonse);
+
+        clientRepsonse = context.service
+                .path("api/entities/resume/cluster/" + context.clusterName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class);
+        context.assertFailure(clientRepsonse);
+
+        clientRepsonse = context.service
+                .path("api/entities/delete/cluster/" + context.clusterName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+        context.assertSuccessful(clientRepsonse);
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
+        context.assertSuccessful(response);
+    }
+
+    @Test
+    public void testGetEntityDefinition() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.service
+                .path("api/entities/definition/feed/" + overlay.get("inputFeedName"))
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+
+        String feedXML = response.getEntity(String.class);
+        try {
+            Feed result = (Feed) context.unmarshaller.
+                    unmarshal(new StringReader(feedXML));
+            Assert.assertEquals(result.getName(), overlay.get("inputFeedName"));
+        } catch (JAXBException e) {
+            Assert.fail("Reponse " + feedXML + " is not valid", e);
+        }
+    }
+
+    @Test
+    public void testInvalidGetEntityDefinition() {
+        TestContext context = newContext();
+        ClientResponse clientRepsonse = context.service
+                .path("api/entities/definition/process/sample1")
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+        context.assertFailure(clientRepsonse);
+    }
+
+    @Test
+    public void testScheduleSuspendResume() throws Exception {
+        TestContext context = newContext();
+        context.scheduleProcess();
+
+        ClientResponse clientRepsonse = context.service
+                .path("api/entities/suspend/process/" + context.processName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).post(ClientResponse.class);
+        context.assertSuccessful(clientRepsonse);
+
+        clientRepsonse = context.service
+                .path("api/entities/resume/process/" + context.processName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).post(ClientResponse.class);
+        context.assertSuccessful(clientRepsonse);
+    }
+
+    @Test(enabled = true)
+    public void testFeedSchedule() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        createTestData(context);
+        ClientResponse clientRepsonse = context.service
+                .path("api/entities/schedule/feed/" + overlay.get("inputFeedName"))
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class);
+        context.assertSuccessful(clientRepsonse);
+    }
+
+    private List<Path> createTestData(TestContext context) throws Exception {
+        List<Path> list = new ArrayList<Path>();
+        FileSystem fs = context.cluster.getFileSystem();
+        fs.mkdirs(new Path("/user/guest"));
+        fs.setOwner(new Path("/user/guest"), TestContext.REMOTE_USER, "users");
+
+        DateFormat formatter = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
+        formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date date = new Date(System.currentTimeMillis() + 3 * 3600000);
+        Path path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        date = new Date(date.getTime() - 3600000);
+        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
+        list.add(path);
+        fs.create(path).close();
+        new FsShell(context.cluster.getConf()).
+                run(new String[]{"-chown", "-R", "guest:users", "/examples/input-data/rawLogs"});
+        return list;
+    }
+
+    @Test
+    public void testDeleteDataSet() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.service
+                .path("api/entities/delete/feed/" + overlay.get("inputFeedName"))
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+        context.assertSuccessful(response);
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.service
+                .path("api/entities/delete/cluster/" + context.clusterName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+        context.assertFailure(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
+        context.assertSuccessful(response);
+
+        //Delete a referred feed
+        response = context.service
+                .path("api/entities/delete/feed/" + overlay.get("inputFeedName"))
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+        context.assertFailure(response);
+
+        //Delete a submitted process
+        response = context.service
+                .path("api/entities/delete/process/" + context.processName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
+        context.assertSuccessful(response);
+
+        ClientResponse clientRepsonse = context.service
+                .path("api/entities/schedule/process/" + context.processName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class);
+        context.assertSuccessful(clientRepsonse);
+
+        //Delete a scheduled process
+        response = context.service
+                .path("api/entities/delete/process/" + context.processName)
+                .header("Remote-User", TestContext.REMOTE_USER)
+                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+        context.assertSuccessful(response);
+    }
+
+    @Test
+    public void testGetDependencies() throws Exception {
+        TestContext context = newContext();
+        ClientResponse response;
+        response = context.service
+                .path("api/entities/list/process/")
+                .header("Remote-User", TestContext.REMOTE_USER).type(MediaType.TEXT_XML)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+        Assert.assertEquals(response.getStatus(), 200);
+
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.service
+                .path("api/entities/list/cluster/")
+                .header("Remote-User", TestContext.REMOTE_USER).type(MediaType.TEXT_XML)
+                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+        Assert.assertEquals(response.getStatus(), 200);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyTest.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyTest.java
deleted file mode 100644
index b2d234d..0000000
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyTest.java
+++ /dev/null
@@ -1,573 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.resource;
-
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.process.Validity;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.DeploymentProperties;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.fs.Path;
-import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.Job.Status;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import javax.servlet.ServletInputStream;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.JAXBException;
-import java.io.File;
-import java.io.InputStream;
-import java.io.StringReader;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-/**
- * Test class for Entity REST APIs.
- *
- * Tests should be enabled only in local environments as they need running instance of the web server.
- */
-public class EntityManagerJerseyTest extends AbstractTestBase {
-
-    @Test
-    public void testUpdateCheckUser() throws Exception {
-        Map<String, String> overlay = getUniqueOverlay();
-        String tmpFileName = overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
-        File tmpFile = getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        scheduleProcess(tmpFile.getAbsolutePath(), overlay);
-        waitForBundleStart(Status.RUNNING);
-
-        List<BundleJob> bundles = getBundles();
-        Assert.assertEquals(bundles.size(), 1);
-        Assert.assertEquals(bundles.get(0).getUser(), REMOTE_USER);
-
-        ClientResponse response = this.service.path("api/entities/definition/feed/" + outputFeedName).header(
-                "Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Feed feed = (Feed) EntityType.FEED.getUnmarshaller()
-                .unmarshal(new StringReader(response.getEntity(String.class)));
-
-        //change output feed path and update feed as another user
-        feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
-        tmpFile = getTempFile();
-        EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
-        response = this.service.path("api/entities/update/feed/" + outputFeedName).header("Remote-User",
-                "testuser").accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class, getServletInputStream(tmpFile.getAbsolutePath()));
-        assertSuccessful(response);
-
-        bundles = getBundles();
-        Assert.assertEquals(bundles.size(), 2);
-        Assert.assertEquals(bundles.get(0).getUser(), REMOTE_USER);
-        Assert.assertEquals(bundles.get(1).getUser(), REMOTE_USER);
-    }
-
-
-    @Test(enabled = false)
-    public void testOptionalInput() throws Exception {
-        Map<String, String> overlay = getUniqueOverlay();
-        String tmpFileName = overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
-
-        Input in1 = process.getInputs().getInputs().get(0);
-        Input in2 = new Input();
-        in2.setFeed(in1.getFeed());
-        in2.setName("input2");
-        in2.setOptional(true);
-        in2.setPartition(in1.getPartition());
-        in2.setStart("now(-1,0)");
-        in2.setEnd("now(0,0)");
-        process.getInputs().getInputs().add(in2);
-
-        File tmpFile = getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        scheduleProcess(tmpFile.getAbsolutePath(), overlay);
-        waitForWorkflowStart(processName);
-    }
-
-    @Test
-    public void testProcessDeleteAndSchedule() throws Exception {
-        //Submit process with invalid property so that coord submit fails and bundle goes to failed state
-        Map<String, String> overlay = getUniqueOverlay();
-        String tmpFileName = overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
-        Property prop = new Property();
-        prop.setName("newProp");
-        prop.setValue("${formatTim()}");
-        process.getProperties().getProperties().add(prop);
-        File tmpFile = getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        scheduleProcess(tmpFile.getAbsolutePath(), overlay);
-        waitForBundleStart(Status.FAILED);
-
-        //Delete and re-submit the process with correct workflow
-        ClientResponse clientRepsonse = this.service.path("api/entities/delete/process/" + processName).header(
-                "Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
-        assertSuccessful(clientRepsonse);
-        process.getWorkflow().setPath("/falcon/test/workflow");
-        tmpFile = getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        clientRepsonse = this.service.path("api/entities/submitAndSchedule/process").header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class, getServletInputStream(tmpFile.getAbsolutePath()));
-        assertSuccessful(clientRepsonse);
-
-        //Assert that new schedule creates new bundle
-        List<BundleJob> bundles = getBundles();
-        Assert.assertEquals(bundles.size(), 2);
-    }
-
-    @Test
-    public void testProcessInputUpdate() throws Exception {
-        scheduleProcess();
-        waitForBundleStart(Job.Status.RUNNING);
-
-        ClientResponse response = this.service.path("api/entities/definition/process/" + processName).header(
-                "Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(new StringReader(response.getEntity(String.class)));
-
-        String feed3 = "f3" + System.currentTimeMillis();
-        Map<String, String> overlay = new HashMap<String, String>();
-        overlay.put("inputFeedName", feed3);
-        overlay.put("cluster", clusterName);
-        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        Input input = new Input();
-        input.setFeed(feed3);
-        input.setName("inputData2");
-        input.setStart("today(20,0)");
-        input.setEnd("today(20,20)");
-        process.getInputs().getInputs().add(input);
-
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
-        File tmpFile = getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        response = this.service.path("api/entities/update/process/" + processName).header("Remote-User",
-                REMOTE_USER).accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class, getServletInputStream(tmpFile.getAbsolutePath()));
-        assertSuccessful(response);
-
-        //Assert that update creates new bundle
-        List<BundleJob> bundles = getBundles();
-        Assert.assertEquals(bundles.size(), 2);
-    }
-
-    @Test
-    public void testProcessEndtimeUpdate() throws Exception {
-        scheduleProcess();
-        waitForBundleStart(Job.Status.RUNNING);
-
-        ClientResponse response = this.service.path("api/entities/definition/process/" + processName).header(
-                "Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(new StringReader(response.getEntity(String.class)));
-
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        processValidity.setEnd(new Date(new Date().getTime() + 60 * 60 * 1000));
-        File tmpFile = getTempFile();
-        EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
-        response = this.service.path("api/entities/update/process/" + processName).header("Remote-User",
-                REMOTE_USER).accept(MediaType.TEXT_XML)
-                .post(ClientResponse.class, getServletInputStream(tmpFile.getAbsolutePath()));
-        assertSuccessful(response);
-
-        //Assert that update does not create new bundle
-        List<BundleJob> bundles = getBundles();
-        Assert.assertEquals(bundles.size(), 1);
-    }
-
-    @Test
-    public void testStatus() throws Exception {
-        ClientResponse response;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        response = this.service
-                .path("api/entities/status/feed/" + overlay.get("inputFeedName"))
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-
-        APIResult result = (APIResult) unmarshaller.
-                unmarshal(new StringReader(response.getEntity(String.class)));
-        Assert.assertTrue(result.getMessage().contains("SUBMITTED"));
-
-    }
-
-    @Test
-    public void testIdempotentSubmit() throws Exception {
-        ClientResponse response;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-    }
-
-    @Test
-    public void testNotFoundStatus() {
-        ClientResponse response;
-        String feed1 = "f1" + System.currentTimeMillis();
-        response = this.service
-                .path("api/entities/status/feed/" + feed1)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
-        String status = response.getEntity(String.class);
-        Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
-    }
-
-    @Test
-    public void testVersion() {
-        ClientResponse response;
-        response = this.service
-                .path("api/admin/version")
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
-        String status = response.getEntity(String.class);
-        Assert.assertEquals(status, "{Version:\""
-                + BuildProperties.get().getProperty("build.version") + "\",Mode:\""
-                + DeploymentProperties.get().getProperty("deploy.mode") + "\"}");
-    }
-
-    @Test
-    public void testValidate() {
-
-        ServletInputStream stream = getServletInputStream(getClass().
-                getResourceAsStream(SAMPLE_PROCESS_XML));
-
-        ClientResponse clientRepsonse = this.service
-                .path("api/entities/validate/process")
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class, stream);
-
-        assertFailure(clientRepsonse);
-    }
-
-    @Test
-    public void testClusterValidate() throws Exception {
-        ClientResponse clientRepsonse;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        InputStream stream = getServletInputStream(overlayParametersOverTemplate(
-                clusterFileTemplate, overlay));
-
-        clientRepsonse = this.service.path("api/entities/validate/cluster")
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .header("Remote-User", REMOTE_USER)
-                .post(ClientResponse.class, stream);
-        assertSuccessful(clientRepsonse);
-    }
-
-    @Test
-    public void testClusterSubmitScheduleSuspendResumeDelete() throws Exception {
-        ClientResponse clientRepsonse;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        clientRepsonse = submitToFalcon(clusterFileTemplate, overlay,
-                EntityType.CLUSTER);
-        assertSuccessful(clientRepsonse);
-
-        clientRepsonse = this.service
-                .path("api/entities/schedule/cluster/" + clusterName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        assertFailure(clientRepsonse);
-
-        clientRepsonse = this.service
-                .path("api/entities/suspend/cluster/" + clusterName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        assertFailure(clientRepsonse);
-
-        clientRepsonse = this.service
-                .path("api/entities/resume/cluster/" + clusterName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        assertFailure(clientRepsonse);
-
-        clientRepsonse = this.service
-                .path("api/entities/delete/cluster/" + clusterName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
-        assertSuccessful(clientRepsonse);
-    }
-
-    @Test
-    public void testSubmit() throws Exception {
-
-        ClientResponse response;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        response = submitToFalcon(PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
-        assertSuccessful(response);
-    }
-
-    @Test
-    public void testGetEntityDefinition() throws Exception {
-        ClientResponse response;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        response = this.service
-                .path("api/entities/definition/feed/" + overlay.get("inputFeedName"))
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-
-        String feedXML = response.getEntity(String.class);
-        try {
-            Feed result = (Feed) unmarshaller.
-                    unmarshal(new StringReader(feedXML));
-            Assert.assertEquals(result.getName(), overlay.get("inputFeedName"));
-        } catch (JAXBException e) {
-            Assert.fail("Reponse " + feedXML + " is not valid", e);
-        }
-    }
-
-    @Test
-    public void testInvalidGetEntityDefinition() {
-        ClientResponse clientRepsonse = this.service
-                .path("api/entities/definition/process/sample1")
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        assertFailure(clientRepsonse);
-    }
-
-    @Test
-    public void testScheduleSuspendResume() throws Exception {
-        scheduleProcess();
-
-        ClientResponse clientRepsonse = this.service
-                .path("api/entities/suspend/process/" + processName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).post(ClientResponse.class);
-        assertSuccessful(clientRepsonse);
-
-        clientRepsonse = this.service
-                .path("api/entities/resume/process/" + processName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).post(ClientResponse.class);
-        assertSuccessful(clientRepsonse);
-    }
-
-    @Test(enabled = true)
-    public void testFeedSchedule() throws Exception {
-        ClientResponse response;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        createTestData();
-        ClientResponse clientRepsonse = this.service
-                .path("api/entities/schedule/feed/" + overlay.get("inputFeedName"))
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        assertSuccessful(clientRepsonse);
-    }
-
-    private List<Path> createTestData() throws Exception {
-        List<Path> list = new ArrayList<Path>();
-        FileSystem fs = cluster.getFileSystem();
-        fs.mkdirs(new Path("/user/guest"));
-        fs.setOwner(new Path("/user/guest"), REMOTE_USER, "users");
-
-        DateFormat formatter = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
-        formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
-        Date date = new Date(System.currentTimeMillis() + 3 * 3600000);
-        Path path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        list.add(path);
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        list.add(path);
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        list.add(path);
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        list.add(path);
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        list.add(path);
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        list.add(path);
-        fs.create(path).close();
-        date = new Date(date.getTime() - 3600000);
-        path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
-        list.add(path);
-        fs.create(path).close();
-        new FsShell(cluster.getConf()).run(new String[]{"-chown", "-R", "guest:users", "/examples/input-data/rawLogs"});
-        return list;
-    }
-
-    @Test
-    public void testDeleteDataSet() throws Exception {
-        ClientResponse response;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        response = this.service
-                .path("api/entities/delete/feed/" + overlay.get("inputFeedName"))
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
-        assertSuccessful(response);
-    }
-
-    @Test
-    public void testDelete() throws Exception {
-
-        ClientResponse response;
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        response = this.service
-                .path("api/entities/delete/cluster/" + clusterName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
-        assertFailure(response);
-
-        response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
-        assertSuccessful(response);
-
-        response = submitToFalcon(PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
-        assertSuccessful(response);
-
-        //Delete a referred feed
-        response = this.service
-                .path("api/entities/delete/feed/" + overlay.get("inputFeedName"))
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
-        assertFailure(response);
-
-        //Delete a submitted process
-        response = this.service
-                .path("api/entities/delete/process/" + processName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
-        assertSuccessful(response);
-
-        response = submitToFalcon(PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
-        assertSuccessful(response);
-
-        ClientResponse clientRepsonse = this.service
-                .path("api/entities/schedule/process/" + processName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
-                .post(ClientResponse.class);
-        assertSuccessful(clientRepsonse);
-
-        //Delete a scheduled process
-        response = this.service
-                .path("api/entities/delete/process/" + processName)
-                .header("Remote-User", REMOTE_USER)
-                .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
-        assertSuccessful(response);
-    }
-
-    @Test
-    public void testGetDependencies() throws Exception {
-        ClientResponse response;
-        response = this.service
-                .path("api/entities/list/process/")
-                .header("Remote-User", REMOTE_USER).type(MediaType.TEXT_XML)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Assert.assertEquals(response.getStatus(), 200);
-
-        Map<String, String> overlay = getUniqueOverlay();
-
-        response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
-        assertSuccessful(response);
-
-        response = this.service
-                .path("api/entities/list/cluster/")
-                .header("Remote-User", REMOTE_USER).type(MediaType.TEXT_XML)
-                .accept(MediaType.TEXT_XML).get(ClientResponse.class);
-        Assert.assertEquals(response.getStatus(), 200);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
new file mode 100644
index 0000000..ac15391
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ExternalId;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.resource.InstancesResult.Instance;
+import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
+import org.apache.falcon.workflow.engine.OozieClientFactory;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+
+/**
+ * Test class for Process Instance REST API.
+ */
+@Test(enabled = false)
+public class ProcessInstanceManagerIT {
+    private static final String START_INSTANCE = "2012-04-20T00:00Z";
+
+    protected void schedule(TestContext context) throws Exception {
+        context.scheduleProcess();
+        context.waitForProcessWFtoStart();
+    }
+
+    public void testGetRunningInstances() throws Exception {
+        TestContext context = new TestContext();
+        schedule(context);
+        InstancesResult response = context.service.path("api/instance/running/process/" + context.processName)
+                .header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON).get(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+        Assert.assertNotNull(response.getInstances());
+        Assert.assertEquals(1, response.getInstances().length);
+        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
+    }
+
+    private void assertInstance(Instance processInstance, String instance, WorkflowStatus status) {
+        Assert.assertNotNull(processInstance);
+        Assert.assertNotNull(processInstance.getInstance());
+        Assert.assertTrue(processInstance.getInstance().endsWith(instance));
+        Assert.assertEquals(processInstance.getStatus(), status);
+    }
+
+    public void testGetInstanceStatus() throws Exception {
+        TestContext context = new TestContext();
+        schedule(context);
+        InstancesResult response = context.service.path("api/instance/status/process/" + context.processName)
+                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+        Assert.assertNotNull(response.getInstances());
+        Assert.assertEquals(1, response.getInstances().length);
+        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
+    }
+
+    public void testReRunInstances() throws Exception {
+        testKillInstances();
+        TestContext context = new TestContext();
+        InstancesResult response = context.service.path("api/instance/rerun/process/" + context.processName)
+                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
+                .post(InstancesResult.class);
+
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+        Assert.assertNotNull(response.getInstances());
+        Assert.assertEquals(1, response.getInstances().length);
+        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
+
+        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.RUNNING);
+    }
+
+    public void testKillInstances() throws Exception {
+        TestContext context = new TestContext();
+        schedule(context);
+        InstancesResult response = context.service.path("api/instance/kill/process/" + context.processName)
+                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
+                .post(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+        Assert.assertNotNull(response.getInstances());
+        Assert.assertEquals(1, response.getInstances().length);
+        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED);
+
+        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.KILLED);
+    }
+
+    public void testSuspendInstances() throws Exception {
+        TestContext context = new TestContext();
+        schedule(context);
+        InstancesResult response = context.service.path("api/instance/suspend/process/" + context.processName)
+                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
+                .post(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+        Assert.assertNotNull(response.getInstances());
+        Assert.assertEquals(1, response.getInstances().length);
+        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.SUSPENDED);
+
+        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.SUSPENDED);
+    }
+
+    public void testResumesInstances() throws Exception {
+        testSuspendInstances();
+
+        TestContext context = new TestContext();
+        InstancesResult response = context.service.path("api/instance/resume/process/" + context.processName)
+                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
+                .post(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+        Assert.assertNotNull(response.getInstances());
+        Assert.assertEquals(1, response.getInstances().length);
+        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
+
+        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.RUNNING);
+    }
+
+    private void waitForWorkflow(String instance, WorkflowJob.Status status) throws Exception {
+        TestContext context = new TestContext();
+        ExternalId extId = new ExternalId(context.processName, Tag.DEFAULT, EntityUtil.parseDateUTC(instance));
+        OozieClient ozClient = OozieClientFactory.get(
+                (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, context.clusterName));
+        String jobId = ozClient.getJobId(extId.getId());
+        WorkflowJob jobInfo = null;
+        for (int i = 0; i < 15; i++) {
+            jobInfo = ozClient.getJobInfo(jobId);
+            if (jobInfo.getStatus() == status) {
+                break;
+            }
+            System.out.println("Waiting for workflow job " + jobId + " status " + status);
+            Thread.sleep((i + 1) * 1000);
+        }
+        Assert.assertEquals(status, jobInfo.getStatus());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerTest.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerTest.java
deleted file mode 100644
index c2f6b51..0000000
--- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.resource;
-
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.resource.InstancesResult.Instance;
-import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
-import org.apache.falcon.workflow.engine.OozieClientFactory;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import javax.ws.rs.core.MediaType;
-
-/**
- * Test class for Process Instance REST API.
- */
-@Test(enabled = false)
-public class ProcessInstanceManagerTest extends AbstractTestBase {
-    private static final String START_INSTANCE = "2012-04-20T00:00Z";
-
-    protected void schedule() throws Exception {
-        scheduleProcess();
-        waitForProcessWFtoStart();
-    }
-
-    public void testGetRunningInstances() throws Exception {
-        schedule();
-        InstancesResult response = this.service.path("api/instance/running/process/" + processName)
-                .header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON).get(InstancesResult.class);
-        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
-        Assert.assertNotNull(response.getInstances());
-        Assert.assertEquals(1, response.getInstances().length);
-        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
-    }
-
-    private void assertInstance(Instance processInstance, String instance, WorkflowStatus status) {
-        Assert.assertNotNull(processInstance);
-        Assert.assertNotNull(processInstance.getInstance());
-        Assert.assertTrue(processInstance.getInstance().endsWith(instance));
-        Assert.assertEquals(processInstance.getStatus(), status);
-    }
-
-    public void testGetInstanceStatus() throws Exception {
-        schedule();
-        InstancesResult response = this.service.path("api/instance/status/process/" + processName)
-                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
-                .get(InstancesResult.class);
-        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
-        Assert.assertNotNull(response.getInstances());
-        Assert.assertEquals(1, response.getInstances().length);
-        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
-    }
-
-    public void testReRunInstances() throws Exception {
-        testKillInstances();
-
-        InstancesResult response = this.service.path("api/instance/rerun/process/" + processName)
-                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
-                .post(InstancesResult.class);
-
-        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
-        Assert.assertNotNull(response.getInstances());
-        Assert.assertEquals(1, response.getInstances().length);
-        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
-
-        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.RUNNING);
-    }
-
-    public void testKillInstances() throws Exception {
-        schedule();
-        InstancesResult response = this.service.path("api/instance/kill/process/" + processName)
-                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
-                .post(InstancesResult.class);
-        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
-        Assert.assertNotNull(response.getInstances());
-        Assert.assertEquals(1, response.getInstances().length);
-        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED);
-
-        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.KILLED);
-    }
-
-    public void testSuspendInstances() throws Exception {
-        schedule();
-        InstancesResult response = this.service.path("api/instance/suspend/process/" + processName)
-                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
-                .post(InstancesResult.class);
-        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
-        Assert.assertNotNull(response.getInstances());
-        Assert.assertEquals(1, response.getInstances().length);
-        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.SUSPENDED);
-
-        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.SUSPENDED);
-    }
-
-    public void testResumesInstances() throws Exception {
-        testSuspendInstances();
-
-        InstancesResult response = this.service.path("api/instance/resume/process/" + processName)
-                .queryParam("start", START_INSTANCE).header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON)
-                .post(InstancesResult.class);
-        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
-        Assert.assertNotNull(response.getInstances());
-        Assert.assertEquals(1, response.getInstances().length);
-        assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
-
-        waitForWorkflow(START_INSTANCE, WorkflowJob.Status.RUNNING);
-    }
-
-    private void waitForWorkflow(String instance, WorkflowJob.Status status) throws Exception {
-        ExternalId extId = new ExternalId(processName, Tag.DEFAULT, EntityUtil.parseDateUTC(instance));
-        OozieClient ozClient = OozieClientFactory.get(
-                (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName));
-        String jobId = ozClient.getJobId(extId.getId());
-        WorkflowJob jobInfo = null;
-        for (int i = 0; i < 15; i++) {
-            jobInfo = ozClient.getJobInfo(jobId);
-            if (jobInfo.getStatus() == status) {
-                break;
-            }
-            System.out.println("Waiting for workflow job " + jobId + " status " + status);
-            Thread.sleep((i + 1) * 1000);
-        }
-        Assert.assertEquals(status, jobInfo.getStatus());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
new file mode 100644
index 0000000..9f67a80
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.cluster.util.StandAloneCluster;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.OozieClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.Job.Status;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+
+import javax.servlet.ServletInputStream;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Base test class for CLI, Entity and Process Instances.
+ */
+public class TestContext {
+    public static final String FEED_TEMPLATE1 = "/feed-template1.xml";
+    public static final String FEED_TEMPLATE2 = "/feed-template2.xml";
+
+    public static final String CLUSTER_TEMPLATE = "/cluster-template.xml";
+
+    public static final String SAMPLE_PROCESS_XML = "/process-version-0.xml";
+    public static final String PROCESS_TEMPLATE = "/process-template.xml";
+
+    public static final String BASE_URL = "http://localhost:41000/falcon-webapp";
+    public static final String REMOTE_USER = System.getProperty("user.name");
+
+    protected Unmarshaller unmarshaller;
+    protected Marshaller marshaller;
+
+    protected EmbeddedCluster cluster;
+    protected WebResource service = null;
+    protected String clusterName;
+    protected String processName;
+    protected String outputFeedName;
+
+    public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_]*##");
+
+    public Unmarshaller getUnmarshaller() {
+        return unmarshaller;
+    }
+
+    public Marshaller getMarshaller() {
+        return marshaller;
+    }
+
+    public EmbeddedCluster getCluster() {
+        return cluster;
+    }
+
+    public WebResource getService() {
+        return service;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getProcessName() {
+        return processName;
+    }
+
+    public String getOutputFeedName() {
+        return outputFeedName;
+    }
+
+    public String getClusterFileTemplate() {
+        return CLUSTER_TEMPLATE;
+    }
+
+    public void scheduleProcess(String processTemplate, Map<String, String> overlay) throws Exception {
+        ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        assertSuccessful(response);
+
+        response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
+        assertSuccessful(response);
+
+        response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
+        assertSuccessful(response);
+
+        response = submitToFalcon(processTemplate, overlay, EntityType.PROCESS);
+        assertSuccessful(response);
+        ClientResponse clientRepsonse = this.service.path("api/entities/schedule/process/" + processName)
+                .header("Remote-User", REMOTE_USER).accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(
+                        ClientResponse.class);
+        assertSuccessful(clientRepsonse);
+    }
+
+    public void scheduleProcess() throws Exception {
+        scheduleProcess(PROCESS_TEMPLATE, getUniqueOverlay());
+    }
+
+    private List<WorkflowJob> getRunningJobs(String entityName) throws Exception {
+        OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
+        StringBuilder builder = new StringBuilder();
+        builder.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING).append(';');
+        builder.append(OozieClient.FILTER_NAME).append('=').append("FALCON_PROCESS_DEFAULT_").append(entityName);
+        return ozClient.getJobsInfo(builder.toString());
+    }
+
+    public void waitForWorkflowStart(String entityName) throws Exception {
+        for (int i = 0; i < 10; i++) {
+            List<WorkflowJob> jobs = getRunningJobs(entityName);
+            if (jobs != null && !jobs.isEmpty()) {
+                return;
+            }
+
+            System.out.println("Waiting for workflow to start");
+            Thread.sleep(i * 1000);
+        }
+        throw new Exception("Workflow for " + entityName + " hasn't started in oozie");
+    }
+
+    public void waitForProcessWFtoStart() throws Exception {
+        waitForWorkflowStart(processName);
+    }
+
+    public void waitForOutputFeedWFtoStart() throws Exception {
+        waitForWorkflowStart(outputFeedName);
+    }
+
+    public void waitForBundleStart(Status status) throws Exception {
+        OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
+        List<BundleJob> bundles = getBundles();
+        if (bundles.isEmpty()) {
+            return;
+        }
+
+        String bundleId = bundles.get(0).getId();
+        for (int i = 0; i < 15; i++) {
+            Thread.sleep(i * 1000);
+            BundleJob bundle = ozClient.getBundleJobInfo(bundleId);
+            if (bundle.getStatus() == status) {
+                if (status == Status.FAILED) {
+                    return;
+                }
+
+                boolean done = false;
+                for (CoordinatorJob coord : bundle.getCoordinators()) {
+                    if (coord.getStatus() == status) {
+                        done = true;
+                    }
+                }
+                if (done) {
+                    return;
+                }
+            }
+            System.out.println("Waiting for bundle " + bundleId + " in " + status + " state");
+        }
+        throw new Exception("Bundle " + bundleId + " is not " + status + " in oozie");
+    }
+
+    public TestContext() {
+        try {
+            JAXBContext jaxbContext = JAXBContext.newInstance(APIResult.class, Feed.class, Process.class, Cluster.class,
+                    InstancesResult.class);
+            unmarshaller = jaxbContext.createUnmarshaller();
+            marshaller = jaxbContext.createMarshaller();
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        configure();
+    }
+
+    public void configure() {
+        StartupProperties.get().setProperty(
+                "application.services",
+                StartupProperties.get().getProperty("application.services")
+                        .replace("org.apache.falcon.service.ProcessSubscriberService", ""));
+        String store = StartupProperties.get().getProperty("config.store.uri");
+        StartupProperties.get().setProperty("config.store.uri", store + System.currentTimeMillis());
+        ClientConfig config = new DefaultClientConfig();
+        Client client = Client.create(config);
+        this.service = client.resource(UriBuilder.fromUri(BASE_URL).build());
+    }
+
+    public void setCluster(String file) throws Exception {
+        cluster = StandAloneCluster.newCluster(file);
+        clusterName = cluster.getCluster().getName();
+    }
+
+    /**
+     * Converts a InputStream into ServletInputStream.
+     *
+     * @param fileName
+     * @return ServletInputStream
+     * @throws java.io.IOException
+     */
+    public ServletInputStream getServletInputStream(String fileName) throws IOException {
+        return getServletInputStream(new FileInputStream(fileName));
+    }
+
+    public ServletInputStream getServletInputStream(final InputStream stream) {
+        return new ServletInputStream() {
+
+            @Override
+            public int read() throws IOException {
+                return stream.read();
+            }
+        };
+    }
+
+    public ClientResponse submitAndSchedule(String template, Map<String, String> overlay, EntityType entityType)
+        throws Exception {
+        String tmpFile = overlayParametersOverTemplate(template, overlay);
+        ServletInputStream rawlogStream = getServletInputStream(tmpFile);
+
+        return this.service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase())
+                .header("Remote-User", "testuser").accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class, rawlogStream);
+    }
+
+    public ClientResponse submitToFalcon(String template, Map<String, String> overlay, EntityType entityType)
+        throws IOException {
+        String tmpFile = overlayParametersOverTemplate(template, overlay);
+        if (entityType == EntityType.CLUSTER) {
+            try {
+                cluster = StandAloneCluster.newCluster(tmpFile);
+                clusterName = cluster.getCluster().getName();
+            } catch (Exception e) {
+                throw new IOException("Unable to setup cluster info", e);
+            }
+        }
+        return submitFileToFalcon(entityType, tmpFile);
+    }
+
+    private ClientResponse submitFileToFalcon(EntityType entityType, String tmpFile) throws IOException {
+
+        ServletInputStream rawlogStream = getServletInputStream(tmpFile);
+
+        return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User",
+                "testuser")
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream);
+    }
+
+    public void assertRequestId(ClientResponse clientRepsonse) {
+        String response = clientRepsonse.getEntity(String.class);
+        try {
+            APIResult result = (APIResult) unmarshaller.unmarshal(new StringReader(response));
+            Assert.assertNotNull(result.getRequestId());
+        } catch (JAXBException e) {
+            Assert.fail("Reponse " + response + " is not valid");
+        }
+    }
+
+    public void assertStatus(ClientResponse clientRepsonse, APIResult.Status status) {
+        String response = clientRepsonse.getEntity(String.class);
+        try {
+            APIResult result = (APIResult) unmarshaller.unmarshal(new StringReader(response));
+            Assert.assertEquals(result.getStatus(), status);
+        } catch (JAXBException e) {
+            Assert.fail("Reponse " + response + " is not valid");
+        }
+    }
+
+    public void assertFailure(ClientResponse clientRepsonse) {
+        Assert.assertEquals(clientRepsonse.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
+        assertStatus(clientRepsonse, APIResult.Status.FAILED);
+    }
+
+    public void assertSuccessful(ClientResponse clientRepsonse) {
+        Assert.assertEquals(clientRepsonse.getStatus(), Response.Status.OK.getStatusCode());
+        assertStatus(clientRepsonse, APIResult.Status.SUCCEEDED);
+    }
+
+    public String overlayParametersOverTemplate(String template, Map<String, String> overlay) throws IOException {
+        File tmpFile = getTempFile();
+        OutputStream out = new FileOutputStream(tmpFile);
+
+        InputStreamReader in;
+        if (getClass().getResourceAsStream(template) == null) {
+            in = new FileReader(template);
+        } else {
+            in = new InputStreamReader(getClass().getResourceAsStream(template));
+        }
+        BufferedReader reader = new BufferedReader(in);
+        String line;
+        while ((line = reader.readLine()) != null) {
+            Matcher matcher = VAR_PATTERN.matcher(line);
+            while (matcher.find()) {
+                String variable = line.substring(matcher.start(), matcher.end());
+                line = line.replace(variable, overlay.get(variable.substring(2, variable.length() - 2)));
+                matcher = VAR_PATTERN.matcher(line);
+            }
+            out.write(line.getBytes());
+            out.write("\n".getBytes());
+        }
+        reader.close();
+        out.close();
+        return tmpFile.getAbsolutePath();
+    }
+
+    public File getTempFile() throws IOException {
+        File target = new File("webapp/target");
+        if (!target.exists()) {
+            target = new File("target");
+        }
+
+        return File.createTempFile("test", ".xml", target);
+    }
+
+    public List<BundleJob> getBundles() throws Exception {
+        List<BundleJob> bundles = new ArrayList<BundleJob>();
+        if (clusterName == null) {
+            return bundles;
+        }
+
+        OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
+        return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + processName, 0, 10);
+    }
+
+    public boolean killOozieJobs() throws Exception {
+        if (cluster == null) {
+            return true;
+        }
+
+        OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
+        List<BundleJob> bundles = getBundles();
+        if (bundles != null) {
+            for (BundleJob bundle : bundles) {
+                ozClient.kill(bundle.getId());
+            }
+        }
+        return false;
+    }
+
+    public Map<String, String> getUniqueOverlay() throws FalconException {
+        Map<String, String> overlay = new HashMap<String, String>();
+        long time = System.currentTimeMillis();
+        clusterName = "cluster" + time;
+        overlay.put("cluster", clusterName);
+        overlay.put("inputFeedName", "in" + time);
+        //only feeds with future dates can be scheduled
+        Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000);
+        overlay.put("feedEndDate", SchemaHelper.formatDateUTC(endDate));
+        overlay.put("outputFeedName", "out" + time);
+        processName = "p" + time;
+        overlay.put("processName", processName);
+        outputFeedName = "out" + time;
+        return overlay;
+    }
+
+    public static void prepare() throws Exception {
+
+        Map<String, String> overlay = new HashMap<String, String>();
+        overlay.put("cluster", RandomStringUtils.randomAlphabetic(5));
+        TestContext context = new TestContext();
+        String file = context.
+                overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay);
+        EmbeddedCluster cluster = StandAloneCluster.newCluster(file);
+
+        cleanupStore();
+
+        // setup dependent workflow and lipath in hdfs
+        FileSystem fs = FileSystem.get(cluster.getConf());
+        fs.mkdirs(new Path("/falcon"), new FsPermission((short) 511));
+
+        Path wfParent = new Path("/falcon/test");
+        fs.delete(wfParent, true);
+        Path wfPath = new Path(wfParent, "workflow");
+        fs.mkdirs(wfPath);
+        fs.copyFromLocalFile(false, true, new Path(TestContext.class.getResource("/fs-workflow.xml").getPath()),
+                new Path(wfPath,
+                        "workflow.xml"));
+        fs.mkdirs(new Path(wfParent, "input/2012/04/20/00"));
+        Path outPath = new Path(wfParent, "output");
+        fs.mkdirs(outPath);
+        fs.setPermission(outPath, new FsPermission((short) 511));
+    }
+
+    public static void cleanupStore() throws Exception {
+        for (EntityType type : EntityType.values()) {
+            for (String name : ConfigurationStore.get().getEntities(type)) {
+                ConfigurationStore.get().remove(type, name);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/resources/client.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/client.properties b/webapp/src/test/resources/client.properties
index 12ae0ac..6ecaa87 100644
--- a/webapp/src/test/resources/client.properties
+++ b/webapp/src/test/resources/client.properties
@@ -16,4 +16,4 @@
 # limitations under the License.
 #
 
-falcon.url=http://localhost:15000/
\ No newline at end of file
+falcon.url=http://localhost:41000/
\ No newline at end of file


Mime
View raw message