falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [3/3] falcon git commit: FALCON-1447 Integration Tests for native scheduler. Contributed by Pavan Kumar Kolamuri.
Date Tue, 15 Dec 2015 12:24:06 GMT
FALCON-1447 Integration Tests for native scheduler. Contributed by Pavan Kumar Kolamuri.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4591ffb6
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4591ffb6
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4591ffb6

Branch: refs/heads/master
Commit: 4591ffb61f0820c674b0f84f5b18721f7cbc39e3
Parents: 47f8a60
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Tue Dec 15 17:05:12 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Tue Dec 15 17:53:17 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../oozie/client/LocalProxyOozieClient.java     |   8 +-
 .../falcon/resource/AbstractEntityManager.java  |   3 +-
 scheduler/pom.xml                               |  18 +++
 .../workflow/engine/FalconWorkflowEngine.java   |   6 +-
 .../falcon/state/AbstractSchedulerTestBase.java |   4 +-
 .../apache/falcon/unit/FalconUnitClient.java    |   4 +-
 .../unit/LocalSchedulableEntityManager.java     |   4 +-
 unit/src/main/resources/oozie-site.xml          |  47 +++++++
 .../apache/falcon/unit/FalconUnitTestBase.java  |   6 +-
 .../unit/examples/JavaHelloWorldExample.java    |  33 +++++
 webapp/pom.xml                                  |   8 ++
 .../AbstractSchedulerManagerJerseyIT.java       | 123 +++++++++++++++++
 .../falcon/resource/EntityManagerJerseyIT.java  |   2 +-
 .../EntitySchedulerManagerJerseyIT.java         | 117 +++++++++++++++++
 .../InstanceSchedulerManagerJerseyIT.java       | 131 +++++++++++++++++++
 .../resource/ProcessInstanceManagerIT.java      |   2 +-
 .../apache/falcon/resource/UnitTestContext.java |   8 +-
 .../src/test/resources/helloworldworkflow.xml   |  39 ++++++
 .../local-process-noinputs-template.xml         |  42 ++++++
 webapp/src/test/resources/runtime.properties    |   2 +
 webapp/src/test/resources/startup.properties    |  31 ++++-
 22 files changed, 620 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4c2e42c..ce346a8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,8 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1447 Integration Tests for native scheduler(Pavan Kumar Kolamuri via Ajay Yadava)
+
     FALCON-1617 Enable SLA monitoring for instances in past(Narayan Periwal via Ajay Yadava)
 
     FALCON-1577 Migration of EntityManagerJerseyIT to use falcon unit (Narayan Periwal via
Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index f6e87c4..81f4c54 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -90,7 +90,13 @@ public class LocalProxyOozieClient extends OozieClient {
     }
 
     public String run(Properties conf) throws OozieClientException {
-        return getLocalOozieClientBundle().run(conf);
+        if (conf.getProperty("oozie.wf.application.path") != null) {
+            return getLocalOozieClient().run(conf);
+        } else if (conf.getProperty("oozie.coord.application.path") != null) {
+            return getLocalOozieClientCoordProxy().run(conf);
+        } else {
+            return getLocalOozieClientBundle().run(conf);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 2f97c0d..5a6d2dc 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -530,7 +530,7 @@ public abstract class AbstractEntityManager {
         return new String(data);
     }
 
-    private enum EntityStatus {
+    protected enum EntityStatus {
         SUBMITTED, SUSPENDED, RUNNING, COMPLETED
     }
 
@@ -553,7 +553,6 @@ public abstract class AbstractEntityManager {
         } catch (FalconWebException e) {
             throw e;
         } catch (Exception e) {
-
             LOG.error("Unable to get status for entity {} ({})", entity, type, e);
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index c934b3e..72568f0 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -214,6 +214,24 @@
                 </executions>
             </plugin>
 
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/log4j.xml</exclude>
+                    </excludes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 2c45fbd..c19cada 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -175,13 +175,17 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
             states = new ArrayList<>();
             states.add(InstanceState.STATE.SUSPENDED);
             break;
-        case STATUS:
         case PARAMS:
             // Applicable only for running and finished jobs.
             states = InstanceState.getRunningStates();
             states.addAll(InstanceState.getTerminalStates());
             states.add(InstanceState.STATE.SUSPENDED);
             break;
+        case STATUS:
+            states = InstanceState.getActiveStates();
+            states.addAll(InstanceState.getTerminalStates());
+            states.add(InstanceState.STATE.SUSPENDED);
+            break;
         default:
             throw new IllegalArgumentException("Unhandled action " + action);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
index 48c1426..a8be06d 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
@@ -30,7 +30,7 @@ import java.io.File;
 import java.io.IOException;
 
 /**
- * TestBase for tests in scheduler.
+ * TestBase for tests using Falcon Native Scheduler.
  */
 public class AbstractSchedulerTestBase extends AbstractTestBase {
     private static final String DB_BASE_DIR = "target/test-data/falcondb";
@@ -56,7 +56,7 @@ public class AbstractSchedulerTestBase extends AbstractTestBase {
         fs.delete(new Path(DB_BASE_DIR), true);
     }
 
-    protected void createDB(String file) {
+    public void createDB(String file) {
         File sqlFile = new File(file);
         String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
         int result = execDBCLICommands(argsCreate);

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index f34a90c..e86e3e8 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -128,8 +128,8 @@ public class FalconUnitClient extends AbstractFalconClient {
     }
 
     @Override
-    public APIResult delete(EntityType entityType, String entityName, String doAsUser) {
-        return localSchedulableEntityManager.delete(entityType, entityName, doAsUser);
+    public APIResult delete(EntityType entityType, String entityName, String colo) {
+        return localSchedulableEntityManager.delete(entityType, entityName, colo);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
index 0065c71..d30b028 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
@@ -49,11 +49,11 @@ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityMana
         return super.getStatus(type, entity, colo);
     }
 
-    public APIResult delete(EntityType entityType, String entityName, String doAsUser) {
+    public APIResult delete(EntityType entityType, String entityName, String colo) {
         if (entityType == null) {
             throw new IllegalStateException("Entity-Type cannot be null");
         }
-        return super.delete(entityType.name(), entityName, doAsUser);
+        return super.delete(entityType.name(), entityName, colo);
     }
 
     public APIResult validate(String entityType, String filePath, Boolean skipDryRun,

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/main/resources/oozie-site.xml
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/oozie-site.xml b/unit/src/main/resources/oozie-site.xml
index 23d41eb..bb7015d 100644
--- a/unit/src/main/resources/oozie-site.xml
+++ b/unit/src/main/resources/oozie-site.xml
@@ -167,4 +167,51 @@
         <name>oozie.service.coord.check.maximum.frequency</name>
         <value>false</value>
     </property>
+
+    <!-- Required to Notify Falcon -->
+    <property>
+        <name>oozie.services.ext</name>
+        <value>
+            org.apache.oozie.service.JMSAccessorService,
+            org.apache.oozie.service.JMSTopicService,
+            org.apache.oozie.service.EventHandlerService
+        </value>
+    </property>
+    <property>
+        <name>oozie.service.EventHandlerService.event.listeners</name>
+        <value>
+            org.apache.oozie.jms.JMSJobEventListener
+        </value>
+    </property>
+    <property>
+        <name>oozie.jms.producer.connection.properties</name>
+        <value>
+            java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost:61616
+        </value>
+    </property>
+    <property>
+        <name>oozie.service.JMSTopicService.topic.name</name>
+        <value>
+            WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC
+        </value>
+        <description>
+            Topic options are ${username} or a fixed string which can be specified as default
or for a
+            particular job type.
+            For e.g To have a fixed string topic for workflows, coordinators and bundles,
+            specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
+            where job type can be WORKFLOW, COORDINATOR or BUNDLE.
+            Following example defines topic for workflow job, workflow action, coordinator
job, coordinator action,
+            bundle job and bundle action
+            WORKFLOW=workflow,
COORDINATOR=coordinator,
+            BUNDLE=bundle
+            For jobs with no defined topic, default topic will be ${username}
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.JMSTopicService.topic.prefix</name>
+        <value>FALCON.</value>
+        <description>
+            This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name.
For eg: oozie.
+        </description>
+    </property>
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 83afac7..382e0c9 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -92,8 +92,8 @@ public class FalconUnitTestBase {
 
     private static final String DEFAULT_CLUSTER = "local";
     private static final String DEFAULT_COLO = "local";
-    private static final String CLUSTER = "cluster";
-    private static final String COLO = "colo";
+    protected static final String CLUSTER = "cluster";
+    protected static final String COLO = "colo";
     protected static final String CLUSTER_TEMPLATE = "/local-cluster-template.xml";
     protected static final String STAGING_PATH = "/projects/falcon/staging";
     protected static final String WORKING_PATH = "/projects/falcon/working";
@@ -105,7 +105,7 @@ public class FalconUnitTestBase {
     protected static ConfigurationStore configStore;
 
     @BeforeClass
-    public void setup() throws FalconException, IOException {
+    public void setup() throws Exception {
         FalconUnit.start(true);
         falconUnitClient = FalconUnit.getClient();
         fs = (JailedFileSystem) FalconUnit.getFileSystem();

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java
b/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java
new file mode 100644
index 0000000..1481c4b
--- /dev/null
+++ b/unit/src/test/java/org/apache/falcon/unit/examples/JavaHelloWorldExample.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit.examples;
+
+import java.io.IOException;
+
+/**
+ * Hello World Example for Unit Tests. This is used in JavaAction in Falcon Unit Tests to
check
+ * whether workflow succeeded or not.
+ */
+public final class JavaHelloWorldExample {
+
+    private JavaHelloWorldExample() {}
+
+    public static void main(String[] args) throws IOException {
+        System.out.println("Hello World");
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 428f67e..05616c5 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -140,6 +140,14 @@
 
         <dependency>
             <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-scheduler</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-retention</artifactId>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
new file mode 100644
index 0000000..f5bcc54
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.unit.FalconUnitTestBase;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for tests using Native Scheduler.
+ */
+public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase {
+
+    public static final String PROCESS_TEMPLATE = "/local-process-noinputs-template.xml";
+    public static final String PROCESS_NAME = "processName";
+    protected static final String START_INSTANCE = "2012-04-20T00:00Z";
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+    private static final String DB_BASE_DIR = "target/test-data/falcondb";
+    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    protected LocalFileSystem localFS = new LocalFileSystem();
+
+
+    @BeforeClass
+    public void setup() throws Exception {
+        Configuration localConf = new Configuration();
+        localFS.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+        cleanupDB();
+        localFS.mkdirs(new Path(DB_BASE_DIR));
+        updateStartUpProps();
+        falconJPAService.init();
+        createDB();
+        super.setup();
+    }
+
+    private void updateStartUpProps() {
+        StartupProperties.get().setProperty("workflow.engine.impl",
+                "org.apache.falcon.workflow.engine.FalconWorkflowEngine");
+        StartupProperties.get().setProperty("dag.engine.impl",
+                "org.apache.falcon.workflow.engine.OozieDAGEngine");
+        String[] listeners = StartupProperties.get().getProperty("configstore.listeners").split(",");
+        List<String> configListeners = new ArrayList<>(Arrays.asList(listeners));
+        configListeners.remove("org.apache.falcon.service.SharedLibraryHostingService");
+        configListeners.add("org.apache.falcon.state.store.jdbc.JDBCStateStore");
+        StartupProperties.get().setProperty("configstore.listeners", StringUtils.join(configListeners,
","));
+        StartupProperties.get().getProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.jdbc.JDBCStateStore");
+    }
+
+    protected void submitProcess(Map<String, String> overlay) throws IOException, FalconCLIException
{
+        String tmpFile = TestContext.overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
+        APIResult result = submit(EntityType.PROCESS, tmpFile);
+        assertStatus(result);
+    }
+
+    protected void scheduleProcess(String processName, String cluster,
+                                   String startTime, int noOfInstances) throws FalconCLIException
{
+        APIResult result = falconUnitClient.schedule(EntityType.PROCESS, processName, startTime,
noOfInstances,
+                cluster, true, null);
+        assertStatus(result);
+    }
+
+    protected void setupProcessExecution(UnitTestContext context,
+                                         Map<String, String> overlay, int numInstances)
throws Exception {
+        String colo = overlay.get(COLO);
+        String cluster = overlay.get(CLUSTER);
+        submitCluster(colo, cluster, null);
+        context.prepare();
+        submitProcess(overlay);
+
+        String processName = overlay.get(PROCESS_NAME);
+        scheduleProcess(processName, cluster, START_INSTANCE, numInstances);
+    }
+
+    private void createDB() throws Exception {
+        AbstractSchedulerTestBase abstractSchedulerTestBase = new AbstractSchedulerTestBase();
+        StartupProperties.get().setProperty(FalconJPAService.URL, url);
+        abstractSchedulerTestBase.createDB(DB_SQL_FILE);
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.cleanup();
+        cleanupDB();
+    }
+
+    private void cleanupDB() throws IOException {
+        localFS.delete(new Path(DB_BASE_DIR), true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index 439d148..258bb1a 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -89,7 +89,7 @@ public class EntityManagerJerseyIT extends FalconUnitTestBase {
 
     @BeforeClass
     @Override
-    public void setup() throws FalconException, IOException {
+    public void setup() throws Exception {
         String version = System.getProperty("project.version");
         String buildDir = System.getProperty("project.build.directory");
         System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-"
+ version + ".jar");

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java
b/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java
new file mode 100644
index 0000000..35119f0
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntitySchedulerManagerJerseyIT.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.resource;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+
+/**
+ * Test cases for Entity operations using Falcon Native Scheduler.
+ */
+public class EntitySchedulerManagerJerseyIT extends AbstractSchedulerManagerJerseyIT {
+
+    @Test
+    public void testEntitySubmitAndSchedule() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String colo = overlay.get(COLO);
+        String cluster = overlay.get(CLUSTER);
+        submitCluster(colo, cluster, null);
+        context.prepare();
+
+        submitProcess(overlay);
+
+        String processName = overlay.get(PROCESS_NAME);
+        APIResult result = falconUnitClient.getStatus(EntityType.PROCESS, overlay.get(PROCESS_NAME),
cluster, null);
+        assertStatus(result);
+        Assert.assertEquals(AbstractEntityManager.EntityStatus.SUBMITTED.name(), result.getMessage());
+
+        scheduleProcess(processName, cluster, START_INSTANCE, 1);
+
+        result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null);
+        assertStatus(result);
+        Assert.assertEquals(AbstractEntityManager.EntityStatus.RUNNING.name(), result.getMessage());
+
+    }
+
+    @Test
+    public void testEntitySuspendResume() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String colo = overlay.get(COLO);
+        String cluster = overlay.get(CLUSTER);
+        submitCluster(colo, cluster, null);
+        context.prepare();
+
+        submitProcess(overlay);
+
+        String processName = overlay.get(PROCESS_NAME);
+        APIResult result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster,
null);
+        assertStatus(result);
+        Assert.assertEquals(AbstractEntityManager.EntityStatus.SUBMITTED.name(), result.getMessage());
+
+        scheduleProcess(processName, cluster, START_INSTANCE, 1);
+
+        result = falconUnitClient.suspend(EntityType.PROCESS, processName, cluster, null);
+        assertStatus(result);
+
+        result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null);
+        assertStatus(result);
+        Assert.assertEquals(AbstractEntityManager.EntityStatus.SUSPENDED.name(), result.getMessage());
+
+        result = falconUnitClient.resume(EntityType.PROCESS, processName, cluster, null);
+        assertStatus(result);
+
+        result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null);
+        assertStatus(result);
+        Assert.assertEquals(AbstractEntityManager.EntityStatus.RUNNING.name(), result.getMessage());
+
+    }
+
+
+    @Test
+    public void testProcessDelete() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String colo = overlay.get(COLO);
+        String cluster = overlay.get(CLUSTER);
+        submitCluster(colo, cluster, null);
+        context.prepare();
+
+        submitProcess(overlay);
+
+        String processName = overlay.get(PROCESS_NAME);
+        APIResult result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster,
null);
+        assertStatus(result);
+        Assert.assertEquals(AbstractEntityManager.EntityStatus.SUBMITTED.name(), result.getMessage());
+
+        scheduleProcess(processName, cluster, START_INSTANCE, 1);
+
+        result = falconUnitClient.getStatus(EntityType.PROCESS, processName, cluster, null);
+        assertStatus(result);
+
+        result = falconUnitClient.delete(EntityType.PROCESS, processName, cluster);
+        assertStatus(result);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
new file mode 100644
index 0000000..7959b63
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Tests for Instance operations using Falcon Native Scheduler.
+ */
+public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJerseyIT {
+
+
+    private static final String END_TIME = "2012-04-21T00:00Z";
+    private static final String HELLO_WORLD_WORKFLOW = "helloworldworkflow.xml";
+
+
+    @Test
+    public void testProcessInstanceExecution() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String colo = overlay.get(COLO);
+        String cluster = overlay.get(CLUSTER);
+
+        submitCluster(colo, cluster, null);
+        context.prepare(HELLO_WORLD_WORKFLOW);
+        submitProcess(overlay);
+
+        String processName = overlay.get(PROCESS_NAME);
+        scheduleProcess(processName, cluster, START_INSTANCE, 1);
+
+        waitForStatus(EntityType.PROCESS.toString(), processName,
+                START_INSTANCE, InstancesResult.WorkflowStatus.SUCCEEDED);
+
+        InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+                processName, START_INSTANCE);
+        Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED);
+
+    }
+
+    @Test
+    public void testKillInstances() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        setupProcessExecution(context, overlay, 1);
+
+        String processName = overlay.get(PROCESS_NAME);
+        String colo = overlay.get(COLO);
+
+        waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
+                InstancesResult.WorkflowStatus.RUNNING);
+
+        InstancesResult result = falconUnitClient.killInstances(EntityType.PROCESS.toString(),
+                processName, START_INSTANCE, END_TIME, colo, null, null, null, null);
+        assertStatus(result);
+
+        InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+                processName, START_INSTANCE);
+        Assert.assertEquals(status, InstancesResult.WorkflowStatus.KILLED);
+
+
+    }
+
+    @Test
+    public void testSuspendResumeInstances() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        setupProcessExecution(context, overlay, 1);
+
+        String processName = overlay.get(PROCESS_NAME);
+        String colo = overlay.get(COLO);
+
+        waitForStatus(EntityType.PROCESS.toString(), processName,
+                START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING);
+
+        falconUnitClient.suspendInstances(EntityType.PROCESS.toString(), processName, START_INSTANCE,
+                END_TIME, colo, null, null, null, null);
+
+        InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+                processName, START_INSTANCE);
+        Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUSPENDED);
+
+        falconUnitClient.resumeInstances(EntityType.PROCESS.toString(), processName, START_INSTANCE,
+                END_TIME, colo, null, null, null, null);
+        status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+                processName, START_INSTANCE);
+        Assert.assertEquals(status, InstancesResult.WorkflowStatus.RUNNING);
+    }
+
+    @Test
+    public void testListInstances() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        setupProcessExecution(context, overlay, 4);
+
+        String processName = overlay.get(PROCESS_NAME);
+        String colo = overlay.get(COLO);
+
+        waitForStatus(EntityType.PROCESS.toString(), processName,
+                START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING);
+
+        InstancesResult result = falconUnitClient.getStatusOfInstances(EntityType.PROCESS.toString(),
processName,
+                START_INSTANCE, "2012-04-23T00:00Z", colo, null, null, null, null, 0, 3,
null);
+        Assert.assertEquals(3, result.getInstances().length);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
index 769d059..f94bd8c 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
@@ -47,7 +47,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase {
 
     @BeforeClass
     @Override
-    public void setup() throws FalconException, IOException {
+    public void setup() throws Exception {
         String version = System.getProperty("project.version");
         String buildDir = System.getProperty("project.build.directory");
         System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-"
+ version + ".jar");

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
index b222305..1d3167b 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java
@@ -87,7 +87,7 @@ public class UnitTestContext {
         }
     }
 
-    protected void prepare() throws Exception {
+    protected void prepare(String workflow) throws Exception {
         mkdir(fs, new Path("/falcon"), new FsPermission((short) 511));
 
         Path wfParent = new Path("/falcon/test");
@@ -96,7 +96,7 @@ public class UnitTestContext {
         mkdir(fs, wfPath);
         mkdir(fs, new Path("/falcon/test/workflow/lib"));
         fs.copyFromLocalFile(false, true,
-                new Path(TestContext.class.getResource("/sleepWorkflow.xml").getPath()),
+                new Path(TestContext.class.getResource("/" + workflow).getPath()),
                 new Path(wfPath, "workflow.xml"));
         mkdir(fs, new Path(wfParent, "input/2012/04/20/00"));
         mkdir(fs, new Path(wfParent, "input/2012/04/21/00"));
@@ -104,6 +104,10 @@ public class UnitTestContext {
         mkdir(fs, outPath, new FsPermission((short) 511));
     }
 
+    protected void prepare() throws Exception {
+        prepare("sleepWorkflow.xml");
+    }
+
     public static File getTempFile() throws IOException {
         return getTempFile("test", ".xml");
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/helloworldworkflow.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/helloworldworkflow.xml b/webapp/src/test/resources/helloworldworkflow.xml
new file mode 100644
index 0000000..354bb34
--- /dev/null
+++ b/webapp/src/test/resources/helloworldworkflow.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf">
+    <start to="java-node"/>
+    <action name="java-node">
+        <java>
+            <job-tracker>local</job-tracker>
+            <name-node>jail://global:00</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>default</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.unit.examples.JavaHelloWorldExample</main-class>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/local-process-noinputs-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/local-process-noinputs-template.xml b/webapp/src/test/resources/local-process-noinputs-template.xml
new file mode 100644
index 0000000..aabdc6a
--- /dev/null
+++ b/webapp/src/test/resources/local-process-noinputs-template.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="##processName##" xmlns="uri:falcon:process:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <pipelines>testPipeline,dataReplicationPipeline</pipelines>
+    <clusters>
+        <cluster name="##src.cluster.name##">
+            <validity end="##processEndDate##" start="2012-04-20T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <parallel>2</parallel>
+    <order>FIFO</order>
+    <frequency>days(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <properties>
+        <property name="fileTime" value="${formatTime(dateOffset(instanceTime(), 1, 'DAY'),
'yyyy-MMM-dd')}"/>
+        <property name="user" value="${user()}"/>
+        <property name="baseTime" value="${today(0,0)}"/>
+        <property name="sundayThisWeek" value="${currentWeek('SUN', 0, 0)}"/>
+    </properties>
+    <workflow engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/>
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/runtime.properties b/webapp/src/test/resources/runtime.properties
index 1da0ca7..fec9e44 100644
--- a/webapp/src/test/resources/runtime.properties
+++ b/webapp/src/test/resources/runtime.properties
@@ -47,4 +47,6 @@
 
 *.falcon.service.ProxyUserService.proxyuser.#USER#.groups=*
 
+*.falcon.jms.notification.enabled=true
+
 ######### Proxyuser Configuration End #########

http://git-wip-us.apache.org/repos/asf/falcon/blob/4591ffb6/webapp/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties
index 756f315..bc88534 100644
--- a/webapp/src/test/resources/startup.properties
+++ b/webapp/src/test/resources/startup.properties
@@ -20,7 +20,6 @@
 
 ######### Implementation classes #########
 ## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
-
 *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
 *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
 *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
@@ -33,11 +32,17 @@
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
+                        org.apache.falcon.state.store.service.FalconJPAService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\
                         org.apache.falcon.metadata.MetadataMappingService,\
-                        org.apache.falcon.service.ProxyUserService
+                        org.apache.falcon.service.ProxyUserService,\
+                        org.apache.falcon.notification.service.impl.JobCompletionService,\
+                        org.apache.falcon.notification.service.impl.SchedulerService,\
+                        org.apache.falcon.notification.service.impl.AlarmService,\
+                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+                        org.apache.falcon.execution.FalconExecutionService
 
 ##### Falcon Configuration Store Change listeners #####
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
@@ -46,6 +51,7 @@
                         org.apache.falcon.entity.store.FeedLocationStore,\
                         org.apache.falcon.service.SharedLibraryHostingService
 
+
 ##### JMS MQ Broker Implementation class #####
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
 
@@ -140,4 +146,23 @@ debug.libext.process.paths=${falcon.libext}
 *.falcon.graph.serialize.path=${user.dir}/target/graphdb
 *.falcon.graph.preserve.history=false
 *.falcon.graph.transaction.retry.count=3
-*.falcon.graph.transaction.retry.delay=5
\ No newline at end of file
+*.falcon.graph.transaction.retry.delay=5
+
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists
is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails
start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file


Mime
View raw message