falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [2/3] falcon git commit: FALCON-1297 Falcon Unit which supports Submit and Schedule of jobs. Contributed by Pavan Kumar Kolamuri.
Date Tue, 04 Aug 2015 12:01:52 GMT
FALCON-1297 Falcon Unit which supports Submit and Schedule of jobs. Contributed by Pavan Kumar Kolamuri.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3f00d051
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3f00d051
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3f00d051

Branch: refs/heads/master
Commit: 3f00d05171e39c75713d1f6d7ff00cfbab16bf89
Parents: 5a3e1d6
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Tue Aug 4 17:13:47 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Tue Aug 4 17:13:47 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   8 +-
 .../falcon/client/AbstractFalconClient.java     |  53 +++
 .../org/apache/falcon/client/FalconClient.java  |   2 +-
 .../entity/parser/ClusterEntityParser.java      |  13 +-
 .../falcon/entity/store/ConfigurationStore.java |  54 ++-
 .../java/org/apache/falcon/util/DateUtil.java   |  39 ++
 .../falcon/workflow/util/OozieConstants.java    |  33 ++
 .../apache/falcon/hadoop/JailedFileSystem.java  |   2 +-
 .../apache/falcon/oozie/OozieEntityBuilder.java |   8 +-
 .../workflow/engine/OozieClientFactory.java     |  20 +-
 .../workflow/engine/OozieWorkflowEngine.java    |  21 +-
 .../oozie/client/LocalOozieClientBundle.java    | 382 +++++++++++++++++++
 .../oozie/client/LocalProxyOozieClient.java     | 188 +++++++++
 pom.xml                                         |   1 +
 unit/pom.xml                                    | 106 +++++
 .../java/org/apache/falcon/unit/FalconUnit.java | 215 +++++++++++
 .../apache/falcon/unit/FalconUnitClient.java    | 250 ++++++++++++
 .../apache/falcon/unit/FalconUnitHelper.java    | 100 +++++
 .../unit/LocalFalconClientProtocolProvider.java |  62 +++
 ...op.mapreduce.protocol.ClientProtocolProvider |  18 +
 unit/src/main/resources/core-site.xml           |  38 ++
 unit/src/main/resources/deploy.properties       |  21 +
 .../main/resources/localoozie-log4j.properties  |  34 ++
 unit/src/main/resources/log4j.xml               |  91 +++++
 unit/src/main/resources/mapred-site.xml         |  35 ++
 unit/src/main/resources/oozie-site.xml          | 170 +++++++++
 unit/src/main/resources/startup.properties      | 129 +++++++
 .../apache/falcon/unit/FalconUnitTestBase.java  | 317 +++++++++++++++
 .../org/apache/falcon/unit/TestFalconUnit.java  |  58 +++
 .../falcon/unit/examples/JavaExample.java       |  65 ++++
 unit/src/test/resources/cluster-template.xml    |  36 ++
 unit/src/test/resources/infeed.xml              |  39 ++
 unit/src/test/resources/input.txt               |  18 +
 unit/src/test/resources/outfeed.xml             |  39 ++
 unit/src/test/resources/process.xml             |  50 +++
 unit/src/test/resources/workflow.xml            |  43 +++
 .../falcon/resource/EntityManagerJerseyIT.java  |   6 +-
 .../resource/ProcessInstanceManagerIT.java      |   3 +-
 .../org/apache/falcon/util/OozieTestUtils.java  |  17 +-
 39 files changed, 2722 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 50ce4d2..e1eae4f 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,11 +4,15 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1297 Falcon Unit which supports Submit and Schedule of jobs(Pavan Kumar Kolamuri via Ajay Yadava)
+
     FALCON-1039 Add instance dependency API in falcon (Ajay Yadava)
 
     FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
     
   IMPROVEMENTS
+    FALCON-1322 Add prefix in runtime.properties(Sandeep Samudrala via Ajay Yadava)
+
     FALCON-1317 Inconsistent JSON serialization(Ajay Yadava)
 
     FALCON-1324 Pagination API breaks backward compatibility(Ajay Yadava).
@@ -150,9 +154,7 @@ Release Version: 0.6.1
    FALCON-822 Add reverse look up API (Ajay Yadava via Suhas Vasu)
 
   IMPROVEMENTS
-   FALCON-1322 Add prefix in runtime.properties(Sandeep Samudrala via Ajay Yadava)
-
-   FALCON-1280 Update docs/license licenses with right copyright 
+   FALCON-1280 Update docs/license licenses with right copyright
    information (Shaik Idris Ali)
 
    FALCON-1276 Verify licensing in html5-ui module. 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
new file mode 100644
index 0000000..bb6d8c9
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.client;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.resource.APIResult;
+
+import java.io.IOException;
+
+/**
+ * Abstract Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs
+ * against an Falcon instance.
+ */
+public abstract class AbstractFalconClient {
+
+    /**
+     * Submit a new entity. Entities can be of type feed, process or data end
+     * points. Entity definitions are validated structurally against schema and
+     * subsequently for other rules before they are admitted into the system.
+     * @param entityType
+     * @param filePath
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract APIResult submit(String entityType, String filePath) throws FalconCLIException,
+            IOException;
+
+    /**
+     * Schedules an submitted process entity immediately.
+     * @param entityType
+     * @param entityName
+     * @param colo
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract APIResult schedule(EntityType entityType, String entityName, String colo) throws FalconCLIException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index d507371..9649e10 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -74,7 +74,7 @@ import java.util.Properties;
  * Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs
  * against an Falcon instance.
  */
-public class FalconClient {
+public class FalconClient extends AbstractFalconClient {
 
     public static final String WS_HEADER_PREFIX = "header:";
     public static final String USER = System.getProperty("user.name");

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 59b0910..5756f84 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -34,6 +34,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.falcon.workflow.util.OozieConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.ConnectionFactory;
 import java.io.IOException;
+import java.net.URI;
 
 /**
  * Parser that parses cluster entity definition.
@@ -92,7 +94,12 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     private void validateScheme(Cluster cluster, Interfacetype interfacetype)
         throws ValidationException {
         final String endpoint = ClusterHelper.getInterface(cluster, interfacetype).getEndpoint();
-        if (new Path(endpoint).toUri().getScheme() == null) {
+        URI uri = new Path(endpoint).toUri();
+        if (uri.getScheme() == null) {
+            if (Interfacetype.WORKFLOW == interfacetype
+                    && uri.toString().equals(OozieConstants.LOCAL_OOZIE)) {
+                return;
+            }
             throw new ValidationException("Cannot get valid scheme for interface: "
                     + interfacetype + " of cluster: " + cluster.getName());
         }
@@ -146,7 +153,9 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     protected void validateWorkflowInterface(Cluster cluster) throws ValidationException {
         final String workflowUrl = ClusterHelper.getOozieUrl(cluster);
         LOG.info("Validating workflow interface: {}", workflowUrl);
-
+        if (OozieConstants.LOCAL_OOZIE.equals(workflowUrl)) {
+            return;
+        }
         try {
             if (!WorkflowEngineFactory.getWorkflowEngine().isAlive(cluster)) {
                 throw new ValidationException("Unable to reach Workflow server:" + workflowUrl);

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index b5f531a..7b53ebb 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -61,6 +61,7 @@ public final class ConfigurationStore implements FalconService {
     private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
     private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
     private static final String UTF_8 = CharEncoding.UTF_8;
+    private final boolean shouldPersist;
 
     private static final FsPermission STORE_PERMISSION =
             new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
@@ -93,17 +94,20 @@ public final class ConfigurationStore implements FalconService {
         return STORE;
     }
 
-    private final FileSystem fs;
-    private final Path storePath;
+    private FileSystem fs;
+    private Path storePath;
 
     private ConfigurationStore() {
         for (EntityType type : EntityType.values()) {
             dictionary.put(type, new ConcurrentHashMap<String, Entity>());
         }
 
-        String uri = StartupProperties.get().getProperty("config.store.uri");
-        storePath = new Path(uri);
-        fs = initializeFileSystem();
+        shouldPersist = Boolean.parseBoolean(StartupProperties.get().getProperty("config.store.persist", "true"));
+        if (shouldPersist) {
+            String uri = StartupProperties.get().getProperty("config.store.uri");
+            storePath = new Path(uri);
+            fs = initializeFileSystem();
+        }
     }
 
     /**
@@ -140,24 +144,26 @@ public final class ConfigurationStore implements FalconService {
             registerListener(listener);
         }
 
-        try {
-            for (EntityType type : ENTITY_LOAD_ORDER) {
-                ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
-                FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
-                if (files != null) {
-                    for (FileStatus file : files) {
-                        String fileName = file.getPath().getName();
-                        String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
-                        // ".xml"
-                        String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
-                        Entity entity = restore(type, entityName);
-                        entityMap.put(entityName, entity);
-                        onReload(entity);
+        if (shouldPersist) {
+            try {
+                for (EntityType type : ENTITY_LOAD_ORDER) {
+                    ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
+                    FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
+                    if (files != null) {
+                        for (FileStatus file : files) {
+                            String fileName = file.getPath().getName();
+                            String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
+                            // ".xml"
+                            String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
+                            Entity entity = restore(type, entityName);
+                            entityMap.put(entityName, entity);
+                            onReload(entity);
+                        }
                     }
                 }
+            } catch (IOException e) {
+                throw new FalconException("Unable to restore configurations", e);
             }
-        } catch (IOException e) {
-            throw new FalconException("Unable to restore configurations", e);
         }
     }
 
@@ -261,7 +267,7 @@ public final class ConfigurationStore implements FalconService {
                 return (T) updatesInProgress.get();
             }
             T entity = (T) entityMap.get(name);
-            if (entity == NULL) { // Object equality being checked
+            if (entity == NULL && shouldPersist) { // Object equality being checked
                 try {
                     entity = this.restore(type, name);
                 } catch (IOException e) {
@@ -322,6 +328,9 @@ public final class ConfigurationStore implements FalconService {
      * @throws FalconException
      */
     private void persist(EntityType type, Entity entity) throws IOException, FalconException {
+        if (!shouldPersist) {
+            return;
+        }
         OutputStream out = fs
                 .create(new Path(storePath,
                         type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml"));
@@ -344,6 +353,9 @@ public final class ConfigurationStore implements FalconService {
      * @throws IOException If any error in accessing the storage
      */
     private void archive(EntityType type, String name) throws IOException {
+        if (!shouldPersist) {
+            return;
+        }
         Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
         HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION);
         fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java
new file mode 100644
index 0000000..e736340
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Helper to get date operations.
+ */
+public final class DateUtil {
+
+    private DateUtil() {}
+
+    public static Date getNextMinute(Date time) throws Exception {
+        Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        insCal.setTime(time);
+
+        insCal.add(Calendar.MINUTE, 1);
+        return insCal.getTime();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
new file mode 100644
index 0000000..05f248e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow.util;
+
+/**
+ * Oozie Constants used across multiple modules.
+ */
+public final class OozieConstants {
+    /**
+     * Constant for the oozie running in local.
+     */
+    public static final String LOCAL_OOZIE = "localoozie";
+
+    private OozieConstants() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
index 7156bbd..d5b2eb3 100644
--- a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
+++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
@@ -58,7 +58,7 @@ public class JailedFileSystem extends FileSystem {
             throw new IOException("Incomplete Jail URI, no jail base: "+ name);
         }
         basePath = new Path(conf.get("jail.base", System.getProperty("hadoop.tmp.dir",
-                        System.getProperty("user.dir") + "/webapp/target/tmp-hadoop-"
+                        System.getProperty("user.dir") + "/target/falcon/tmp-hadoop-"
                                 + System.getProperty("user.name"))) + "/jail-fs/" + base).toUri().getPath();
         this.uri = URI.create(name.getScheme()+"://"+name.getAuthority());
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 9ca0ac1..9a6b14c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Output;
@@ -38,6 +39,7 @@ import org.apache.falcon.service.FalconPathFilter;
 import org.apache.falcon.service.SharedLibraryHostingService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.falcon.workflow.util.OozieConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -168,8 +170,10 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
         properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
         properties.setProperty("colo.name", cluster.getColo());
-
-        properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
+        final String endpoint = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
+        if (!OozieConstants.LOCAL_OOZIE.equals(endpoint)) {
+            properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
+        }
         properties.setProperty("falcon.libpath",
                 ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath()  + "/lib");
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index 622238a..ae5c5fa 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -23,7 +23,11 @@ import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.workflow.util.OozieConstants;
+import org.apache.oozie.client.LocalProxyOozieClient;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.ProxyOozieClient;
+import org.apache.oozie.local.LocalOozie;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,13 +37,12 @@ import org.slf4j.LoggerFactory;
 public final class OozieClientFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(OozieClientFactory.class);
-    private static final String LOCAL_OOZIE = "local";
 
     private static volatile boolean localInitialized = false;
 
     private OozieClientFactory() {}
 
-    public static synchronized ProxyOozieClient get(Cluster cluster)
+    public static synchronized OozieClient get(Cluster cluster)
         throws FalconException {
 
         assert cluster != null : "Cluster cant be null";
@@ -48,28 +51,27 @@ public final class OozieClientFactory {
         return getClientRef(oozieUrl);
     }
 
-    public static ProxyOozieClient get(String clusterName) throws FalconException {
+    public static OozieClient get(String clusterName) throws FalconException {
         return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName));
     }
 
-    private static ProxyOozieClient getClientRef(String oozieUrl)
+    private static OozieClient getClientRef(String oozieUrl)
         throws FalconException {
 
-        if (LOCAL_OOZIE.equals(oozieUrl)) {
+        if (OozieConstants.LOCAL_OOZIE.equals(oozieUrl)) {
             return getLocalOozieClient();
         } else {
             return new ProxyOozieClient(oozieUrl);
         }
     }
 
-    private static ProxyOozieClient getLocalOozieClient() throws FalconException {
+    private static OozieClient getLocalOozieClient() throws FalconException {
         try {
             if (!localInitialized) {
-                //LocalOozie.start();
+                LocalOozie.start();
                 localInitialized = true;
             }
-            //return LocalOozie.getClient();
-            return null;
+            return new LocalProxyOozieClient();
         } catch (Exception e) {
             throw new FalconException(e);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 4085b8f..2f3dc6f 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -58,7 +58,6 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
-import org.apache.oozie.client.ProxyOozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.rest.RestConstants;
@@ -210,7 +209,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private void dryRunInternal(Cluster cluster, Path buildPath) throws FalconException {
         BUNDLEAPP bundle = OozieBundleBuilder.unmarshal(cluster, buildPath);
-        ProxyOozieClient client = OozieClientFactory.get(cluster.getName());
+        OozieClient client = OozieClientFactory.get(cluster.getName());
         for (COORDINATOR coord : bundle.getCoordinator()) {
             Properties props = new Properties();
             props.setProperty(OozieClient.COORDINATOR_APP_PATH, coord.getAppPath());
@@ -396,7 +395,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private void killBundle(String clusterName, BundleJob job) throws FalconException {
-        ProxyOozieClient client = OozieClientFactory.get(clusterName);
+        OozieClient client = OozieClientFactory.get(clusterName);
         try {
             //kill all coords
             for (CoordinatorJob coord : job.getCoordinators()) {
@@ -459,7 +458,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             List<Instance> runInstances = new ArrayList<Instance>();
 
             for (String cluster : clusters) {
-                ProxyOozieClient client = OozieClientFactory.get(cluster);
+                OozieClient client = OozieClientFactory.get(cluster);
                 List<String> wfNames = EntityUtil.getWorkflowNames(entity);
                 List<WorkflowJob> wfs = getRunningWorkflows(cluster, wfNames);
                 if (wfs != null) {
@@ -615,7 +614,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
 
             List<BundleJob> bundles = entry.getValue();
-            ProxyOozieClient client = OozieClientFactory.get(cluster);
+            OozieClient client = OozieClientFactory.get(cluster);
             List<CoordinatorJob> applicableCoords = getApplicableCoords(client, start, end,
                     bundles, lifeCycles);
             long unscheduledInstances = 0;
@@ -901,7 +900,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) {
             String cluster = entry.getKey();
             List<BundleJob> bundles = entry.getValue();
-            ProxyOozieClient client = OozieClientFactory.get(cluster);
+            OozieClient client = OozieClientFactory.get(cluster);
             List<CoordinatorJob> applicableCoords =
                 getApplicableCoords(client, start, end, bundles, lifeCycles);
             List<CoordinatorAction> actions = new ArrayList<CoordinatorAction>();
@@ -947,7 +946,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return coord.getAppName().contains(LifeCycle.EVICTION.getTag().name());
     }
 
-    private void addCoordAction(ProxyOozieClient client, List<CoordinatorAction> actions, String actionId) {
+    private void addCoordAction(OozieClient client, List<CoordinatorAction> actions, String actionId) {
         CoordinatorAction coordActionInfo = null;
         try {
             coordActionInfo = client.getCoordActionInfo(actionId);
@@ -984,7 +983,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private List<CoordinatorJob> getApplicableCoords(ProxyOozieClient client, Date start, Date end,
+    private List<CoordinatorJob> getApplicableCoords(OozieClient client, Date start, Date end,
                                                      List<BundleJob> bundles,
                                                      List<LifeCycle> lifeCycles) throws FalconException {
         List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>();
@@ -1323,7 +1322,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
 
-        ProxyOozieClient client = OozieClientFactory.get(cluster);
+        OozieClient client = OozieClientFactory.get(cluster);
         try {
             WorkflowJob jobInfo = client.getJobInfo(jobId);
             Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
@@ -1385,7 +1384,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public String getWorkflowStatus(String cluster, String jobId) throws FalconException {
 
-        ProxyOozieClient client = OozieClientFactory.get(cluster);
+        OozieClient client = OozieClientFactory.get(cluster);
         try {
             if (jobId.endsWith("-W")) {
                 WorkflowJob jobInfo = client.getJobInfo(jobId);
@@ -1489,7 +1488,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         // assert that its really changed
         try {
-            ProxyOozieClient client = OozieClientFactory.get(cluster);
+            OozieClient client = OozieClientFactory.get(cluster);
             CoordinatorJob coord = client.getCoordJobInfo(id);
             for (int counter = 0; counter < 3; counter++) {
                 Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null : SchemaHelper.parseDateUTC(pauseTime));

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java
new file mode 100644
index 0000000..93b4337
--- /dev/null
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.client;
+
+import org.apache.oozie.BaseEngineException;
+import org.apache.oozie.BundleEngine;
+import org.apache.oozie.BundleEngineException;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.BundleJobInfo;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.XConfiguration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Client API to submit and manage Oozie bundle jobs against an Oozie
+ * intance.
+ */
+public class LocalOozieClientBundle extends OozieClient {
+
+    private final BundleEngine bundleEngine;
+
+    /**
+     * Create a bundle client for Oozie local use.
+     * <p/>
+     *
+     * @param bundleEngine the engine instance to use.
+     */
+    public LocalOozieClientBundle(BundleEngine bundleEngine) {
+        this.bundleEngine = bundleEngine;
+    }
+
+    /**
+     * Return the Oozie URL of the bundle client instance.
+     * <p/>
+     * This URL is the base URL for the Oozie system, without protocol
+     * versioning.
+     *
+     * @return the Oozie URL of the bundle client instance.
+     */
+    @Override
+    public String getOozieUrl() {
+        return "localoozie";
+    }
+
+    /**
+     * Return the Oozie URL used by the client and server for WS communications.
+     * <p/>
+     * This URL is the original URL plus the versioning element path.
+     *
+     * @return the Oozie URL used by the client and server for communication.
+     * @throws OozieClientException thrown in the client
+     *                              and the server are not protocol compatible.
+     */
+    @Override
+    public String getProtocolUrl() throws OozieClientException {
+        return "localoozie";
+    }
+
+    /**
+     * Validate that the Oozie client and server instances are protocol
+     * compatible.
+     *
+     * @throws OozieClientException thrown in the client
+     *                              and the server are not protocol compatible.
+     */
+    @Override
+    public synchronized void validateWSVersion() throws OozieClientException {
+    }
+
+    /**
+     * Create an empty configuration with just the {@link #USER_NAME} set to the
+     * JVM user name and the {@link #GROUP_NAME} set to 'other'.
+     *
+     * @return an empty configuration.
+     */
+    @Override
+    public Properties createConfiguration() {
+        Properties conf = new Properties();
+        if (bundleEngine != null) {
+            conf.setProperty(USER_NAME, bundleEngine.getUser());
+        }
+        return conf;
+    }
+
+    /**
+     * Set a HTTP header to be used in the WS requests by the bundle
+     * instance.
+     *
+     * @param name  header name.
+     * @param value header value.
+     */
+    @Override
+    public void setHeader(String name, String value) {
+    }
+
+    /**
+     * Get the value of a set HTTP header from the bundle instance.
+     *
+     * @param name header name.
+     * @return header value, <code>null</code> if not set.
+     */
+    @Override
+    public String getHeader(String name) {
+        return null;
+    }
+
+    /**
+     * Remove a HTTP header from the bundle client instance.
+     *
+     * @param name header name.
+     */
+    @Override
+    public void removeHeader(String name) {
+    }
+
+    /**
+     * Return an iterator with all the header names set in the bundle
+     * instance.
+     *
+     * @return header names.
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public Iterator<String> getHeaderNames() {
+        return Collections.EMPTY_SET.iterator();
+    }
+
+    /**
+     * Submit a bundle job.
+     *
+     * @param conf job configuration.
+     * @return the job Id.
+     * @throws OozieClientException thrown if the job
+     *                              could not be submitted.
+     */
+    @Override
+    public String submit(Properties conf) throws OozieClientException {
+        try {
+            return bundleEngine.submitJob(new XConfiguration(conf), false);
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        }
+    }
+
+    /**
+     * Start a bundle job.
+     *
+     * @param jobId job Id.
+     * @throws OozieClientException thrown if the job
+     *                              could not be started.
+     */
+    @Override
+    @Deprecated
+    public void start(String jobId) throws OozieClientException {
+        try {
+            bundleEngine.start(jobId);
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        } catch (BaseEngineException bex) {
+            throw new OozieClientException(bex.getErrorCode().toString(), bex);
+        }
+    }
+
+    /**
+     * Submit and start a bundle job.
+     *
+     * @param conf job configuration.
+     * @return the job Id.
+     * @throws OozieClientException thrown if the job
+     *                              could not be submitted.
+     */
+    @Override
+    public String run(Properties conf) throws OozieClientException {
+        try {
+            return bundleEngine.submitJob(new XConfiguration(conf), true);
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        }
+    }
+
+    /**
+     * Rerun a workflow job.
+     *
+     * @param jobId job Id to rerun.
+     * @param conf  configuration information for the rerun.
+     * @throws OozieClientException thrown if the job
+     *                              could not be started.
+     */
+    @Override
+    @Deprecated
+    public void reRun(String jobId, Properties conf) throws OozieClientException {
+        throw new OozieClientException(ErrorCode.E0301.toString(), "no-op");
+    }
+
+    /**
+     * Rerun bundle coordinators.
+     *
+     * @param jobId      bundle jobId
+     * @param coordScope rerun scope for coordinator jobs
+     * @param dateScope  rerun scope for date
+     * @param refresh    true if -refresh is given in command option
+     * @param noCleanup  true if -nocleanup is given in command option
+     * @throws OozieClientException
+     */
+    @Override
+    public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh,
+                            boolean noCleanup) throws OozieClientException {
+        try {
+            new BundleEngine().reRun(jobId, coordScope, dateScope, refresh, noCleanup);
+        } catch (BaseEngineException e) {
+            throw new OozieClientException(e.getErrorCode().toString(), e);
+        }
+        return null;
+    }
+
+    /**
+     * Suspend a bundle job.
+     *
+     * @param jobId job Id.
+     * @throws OozieClientException thrown if the job
+     *                              could not be suspended.
+     */
+    @Override
+    public void suspend(String jobId) throws OozieClientException {
+        try {
+            bundleEngine.suspend(jobId);
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        }
+    }
+
+    /**
+     * Resume a bundle job.
+     *
+     * @param jobId job Id.
+     * @throws OozieClientException thrown if the job
+     *                              could not be resume.
+     */
+    @Override
+    public void resume(String jobId) throws OozieClientException {
+        try {
+            bundleEngine.resume(jobId);
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        }
+    }
+
+    /**
+     * Kill a bundle job.
+     *
+     * @param jobId job Id.
+     * @throws OozieClientException thrown if the job
+     *                              could not be killed.
+     */
+    @Override
+    public void kill(String jobId) throws OozieClientException {
+        try {
+            bundleEngine.kill(jobId);
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        }
+    }
+
+    /**
+     * Get the info of a workflow job.
+     *
+     * @param jobId job Id.
+     * @return the job info.
+     * @throws OozieClientException thrown if the job
+     *                              info could not be retrieved.
+     */
+    @Override
+    @Deprecated
+    public WorkflowJob getJobInfo(String jobId) throws OozieClientException {
+        throw new OozieClientException(ErrorCode.E0301.toString(), "no-op");
+    }
+
+    /**
+     * Get the info of a bundle job.
+     *
+     * @param jobId job Id.
+     * @return the job info.
+     * @throws OozieClientException thrown if the job
+     *                              info could not be retrieved.
+     */
+    @Override
+    public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
+        try {
+            return bundleEngine.getBundleJob(jobId);
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        } catch (BaseEngineException bex) {
+            throw new OozieClientException(bex.getErrorCode().toString(), bex);
+        }
+    }
+
+    /**
+     * Return the info of the workflow jobs that match the filter.
+     *
+     * @param filter job filter. Refer to the {@link OozieClient} for the filter
+     *               syntax.
+     * @param start  jobs offset, base 1.
+     * @param len    number of jobs to return.
+     * @return a list with the workflow jobs info, without node details.
+     * @throws OozieClientException thrown if the jobs info could not be
+     *                              retrieved.
+     */
+    @Override
+    @Deprecated
+    public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException {
+        throw new OozieClientException(ErrorCode.E0301.toString(), "no-op");
+    }
+
+    /**
+     * Return the info of the bundle jobs that match the filter.
+     *
+     * @param filter job filter. Refer to the {@link OozieClient} for the filter
+     *               syntax.
+     * @param start  jobs offset, base 1.
+     * @param len    number of jobs to return.
+     * @return a list with the coordinator jobs info
+     * @throws OozieClientException thrown if the jobs info could not be
+     *                              retrieved.
+     */
+    @Override
+    public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
+        try {
+            start = (start < 1) ? 1 : start; // taken from oozie API
+            len = (len < 1) ? 50 : len;
+            BundleJobInfo info = bundleEngine.getBundleJobs(filter, start, len);
+            List<BundleJob> jobs = new ArrayList<BundleJob>();
+            List<BundleJobBean> jobBeans = info.getBundleJobs();
+            for (BundleJobBean jobBean : jobBeans) {
+                jobs.add(jobBean);
+            }
+            return jobs;
+
+        } catch (BundleEngineException ex) {
+            throw new OozieClientException(ex.getErrorCode().toString(), ex);
+        }
+    }
+
+    /**
+     * Return the info of the workflow jobs that match the filter.
+     * <p/>
+     * It returns the first 100 jobs that match the filter.
+     *
+     * @param filter job filter. Refer to the {@link org.apache.oozie.LocalOozieClient} for the
+     *               filter syntax.
+     * @return a list with the workflow jobs info, without node details.
+     * @throws OozieClientException thrown if the jobs
+     *                              info could not be retrieved.
+     */
+    @Override
+    @Deprecated
+    public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException {
+        throw new OozieClientException(ErrorCode.E0301.toString(), "no-op");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
new file mode 100644
index 0000000..217cec9
--- /dev/null
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client;
+
+import org.apache.oozie.BundleEngine;
+import org.apache.oozie.LocalOozieClient;
+import org.apache.oozie.LocalOozieClientCoord;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.BundleEngineService;
+import org.apache.oozie.service.Services;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Oozie Client for Local Oozie.
+ */
+public class LocalProxyOozieClient extends OozieClient {
+
+    private static LocalOozieClientBundle localOozieClientBundle;
+    private static LocalOozieClientCoord localOozieClientCoord;
+    private static LocalOozieClient localOozieClient;
+    private static final BundleEngine BUNDLE_ENGINE = Services.get().
+            get(BundleEngineService.class).getBundleEngine(System.getProperty("user.name"));
+
+
+    private LocalOozieClientBundle getLocalOozieClientBundle() {
+        if (localOozieClientBundle == null) {
+            localOozieClientBundle = new LocalOozieClientBundle(BUNDLE_ENGINE);
+        }
+        return localOozieClientBundle;
+    }
+
+    private LocalOozieClient getLocalOozieClient() {
+        if (localOozieClient == null) {
+            localOozieClient = (LocalOozieClient) LocalOozie.getClient();
+        }
+        return localOozieClient;
+    }
+
+    private LocalOozieClientCoord getLocalOozieClientCoord() {
+        if (localOozieClientCoord == null) {
+            localOozieClientCoord = (LocalOozieClientCoord) LocalOozie.getCoordClient();
+        }
+        return localOozieClientCoord;
+    }
+
+    @Override
+    public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
+        return getLocalOozieClientBundle().getBundleJobInfo(jobId);
+    }
+
+    @Override
+    public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
+        return getLocalOozieClientBundle().getBundleJobsInfo(filter, start, len);
+    }
+
+    public String run(Properties conf) throws OozieClientException {
+        return getLocalOozieClientBundle().run(conf);
+    }
+
+    @Override
+    public Void reRunBundle(final String jobId, final String coordScope, final String dateScope,
+                            final boolean refresh, final boolean noCleanup) throws OozieClientException {
+        return getLocalOozieClientBundle().reRunBundle(jobId, coordScope, dateScope, refresh, noCleanup);
+    }
+
+    @Override
+    public String dryrun(Properties conf) {
+        return null;
+    }
+
+    @Override
+    public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
+        return getLocalOozieClientCoord().getCoordActionInfo(actionId);
+    }
+
+
+    @Override
+    public CoordinatorJob getCoordJobInfo(final String jobId) throws OozieClientException {
+        return getLocalOozieClientCoord().getCoordJobInfo(jobId);
+    }
+
+    @Override
+    public List<CoordinatorJob> getCoordJobsInfo(final String filter, final int start,
+                                                 final int len) throws OozieClientException {
+        return getLocalOozieClientCoord().getCoordJobsInfo(filter, start, len);
+    }
+
+    @Override
+    public CoordinatorJob getCoordJobInfo(final String jobId, final String filter,
+                                          final int start, final int len) throws OozieClientException {
+        return getLocalOozieClientCoord().getCoordJobInfo(jobId, filter, start, len);
+    }
+
+    @Override
+    public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType,
+                                              final String scope, final boolean refresh,
+                                              final boolean noCleanup) throws OozieClientException {
+        return getLocalOozieClientCoord().reRunCoord(jobId, rerunType, scope, refresh, noCleanup);
+    }
+
+    @Override
+    public List<WorkflowJob> getJobsInfo(final String filter) throws OozieClientException {
+        return getLocalOozieClientCoord().getJobsInfo(filter);
+    }
+
+    @Override
+    public List<WorkflowJob> getJobsInfo(final String filter, final int start,
+                                         final int len) throws OozieClientException {
+        return getLocalOozieClientCoord().getJobsInfo(filter, start, len);
+    }
+
+    @Override
+    public WorkflowJob getJobInfo(final String jobId) throws OozieClientException {
+        return getLocalOozieClient().getJobInfo(jobId);
+    }
+
+
+    @Override
+    public WorkflowAction getWorkflowActionInfo(final String actionId) throws OozieClientException {
+        return getLocalOozieClient().getWorkflowActionInfo(actionId);
+    }
+
+    @Override
+    public WorkflowJob getJobInfo(final String jobId, final int start, final int len) throws OozieClientException {
+        return getLocalOozieClient().getJobInfo(jobId, start, len);
+    }
+
+    @Override
+    public String getJobId(final String externalId) throws OozieClientException {
+        return getLocalOozieClient().getJobId(externalId);
+    }
+
+    @Override
+    public void reRun(String jobId, Properties conf) throws OozieClientException {
+        throw new IllegalStateException("Rerun not supported ");
+    }
+
+    @Override
+    public void suspend(String jobId) throws OozieClientException {
+        throw new IllegalStateException("Suspend not supported ");
+    }
+
+    @Override
+    public void resume(String jobId) throws OozieClientException {
+        throw new IllegalStateException("Resume not supported ");
+    }
+
+    @Override
+    public void kill(String jobId) throws OozieClientException {
+        throw new IllegalStateException("Kill not supported");
+    }
+
+    @Override
+    public void change(final String jobId, final String changeValue) throws OozieClientException {
+        throw new IllegalStateException("Change not supported");
+    }
+
+    @Override
+    public void getJobLog(final String jobId, final String logRetrievalType,
+                          final String logRetrievalScope, final PrintStream ps) throws OozieClientException {
+        throw new IllegalStateException("Job logs not supported");
+    }
+
+    @Override
+    public String getJobLog(final String jobId) throws OozieClientException {
+        throw new IllegalStateException("Job logs not supported");
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 31997e8..34a5471 100644
--- a/pom.xml
+++ b/pom.xml
@@ -390,6 +390,7 @@
         <module>archival</module>
         <module>rerun</module>
         <module>prism</module>
+        <module>unit</module>
         <module>webapp</module>
         <module>docs</module>
     </modules>

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/pom.xml
----------------------------------------------------------------------
diff --git a/unit/pom.xml b/unit/pom.xml
new file mode 100644
index 0000000..ae92687
--- /dev/null
+++ b/unit/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>falcon-main</artifactId>
+        <groupId>org.apache.falcon</groupId>
+        <version>0.7-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>falcon-unit</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-hadoop-dependencies</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-oozie-el-extension</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-oozie-adaptor</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-prism</artifactId>
+            <classifier>classes</classifier>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/log4j.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
new file mode 100644
index 0000000..eebfa2e
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.hadoop.JailedFileSystem;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.ServiceInitializer;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * FalconUnit runs jobs in an Local Mode and Cluster mode . <p/> Falon Unit is meant for development/debugging purposes
+ * only.
+ */
+public final class FalconUnit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconUnit.class);
+    private static final String OOZIE_SITE_XML = "oozie-site.xml";
+    private static final String OOZIE_DEFAULT_XML = "oozie-default.xml";
+    private static final String STORAGE_URL = "jail://global:00";
+    private static final String OOZIE_HOME_DIR = "/tmp/oozie-" + System.getProperty("user.name");
+
+    private static JailedFileSystem jailedFileSystem = new JailedFileSystem();
+    private static final ServiceInitializer STARTUP_SERVICES = new ServiceInitializer();
+    private static Map<String, String> sysProps;
+    private static FalconUnitClient falconUnitClient;
+    private static boolean isLocalMode;
+    private static boolean isFalconUnitActive = false;
+
+    private FalconUnit() {
+    }
+
+
+    public static synchronized void start(boolean isLocal) throws FalconException, IOException {
+        if (isFalconUnitActive) {
+            throw new IllegalStateException("Falcon Unit is already initialized");
+        }
+        isLocalMode = isLocal;
+        //Initialize Startup and runtime properties
+        LOG.info("Initializing startup properties ...");
+        StartupProperties.get();
+
+        LOG.info("Initializing runtime properties ...");
+        RuntimeProperties.get();
+
+        //Initializing Services
+        STARTUP_SERVICES.initialize();
+        ConfigurationStore.get();
+
+        if (isLocalMode) {
+            setupOozieConfigs();
+            initFileSystem();
+        }
+        isFalconUnitActive = true;
+
+    }
+
+    private static void initFileSystem() throws IOException {
+        Configuration conf = new Configuration();
+        conf.set("fs.defaultFS", STORAGE_URL);
+        jailedFileSystem.initialize(LocalFileSystem.getDefaultUri(conf), conf);
+    }
+
+    private static void setupOozieConfigs() throws IOException {
+        sysProps = new HashMap<>();
+        String oozieHomeDir = OOZIE_HOME_DIR;
+        String oozieConfDir = oozieHomeDir + "/conf";
+        String oozieHadoopConfDir = oozieConfDir + "/hadoop-conf";
+        String oozieActionConfDir = oozieConfDir + "/action-conf";
+        String oozieLogsDir = oozieHomeDir + "/logs";
+        String oozieDataDir = oozieHomeDir + "/data";
+
+        LocalFileSystem fs = new LocalFileSystem();
+        fs.mkdirs(new Path(oozieHomeDir));
+        fs.mkdirs(new Path(oozieConfDir));
+        fs.mkdirs(new Path(oozieHadoopConfDir));
+        fs.mkdirs(new Path(oozieActionConfDir));
+        fs.mkdirs(new Path(oozieLogsDir));
+        fs.close();
+
+        setSystemProperty("oozie.home.dir", oozieHomeDir);
+        setSystemProperty("oozie.data.dir", oozieDataDir);
+        setSystemProperty("oozie.action.conf", oozieActionConfDir);
+        setSystemProperty("oozie.log.dir", oozieLogsDir);
+        setSystemProperty("oozie.log4j.file", "localoozie-log4j.properties");
+        setSystemProperty("oozielocal.log", "oozieLogsDir/oozielocal.log");
+
+        Configuration oozieSiteConf = new Configuration(false);
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        InputStream oozieSiteInputStream = classLoader.getResourceAsStream(OOZIE_SITE_XML);
+        XConfiguration configuration = new XConfiguration(oozieSiteInputStream);
+        Properties props = configuration.toProperties();
+        for (String propName : props.stringPropertyNames()) {
+            oozieSiteConf.set(propName, props.getProperty(propName));
+        }
+        oozieSiteInputStream.close();
+
+        InputStream oozieDefaultInputStream = classLoader.getResourceAsStream(OOZIE_DEFAULT_XML);
+        configuration = new XConfiguration(oozieDefaultInputStream);
+        String classes = configuration.get(Services.CONF_SERVICE_CLASSES);
+        oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll(
+                "org.apache.oozie.service.ShareLibService,", ""));
+        File target = new File(oozieConfDir, OOZIE_SITE_XML);
+        FileOutputStream outStream = null;
+        try {
+            outStream = new FileOutputStream(target);
+            oozieSiteConf.writeXml(outStream);
+        } finally {
+            if (outStream != null) {
+                outStream.close();
+            }
+        }
+        oozieDefaultInputStream.close();
+
+        CurrentUser.authenticate(System.getProperty("user.name"));
+    }
+
+    public static synchronized void cleanup() throws Exception {
+        STARTUP_SERVICES.destroy();
+        if (isLocalMode) {
+            cleanUpOozie();
+            jailedFileSystem.close();
+        }
+        isFalconUnitActive = false;
+    }
+
+    private static void cleanUpOozie() throws IOException, FalconException {
+        LocalOozie.stop();
+        FileUtils.deleteDirectory(new File(OOZIE_HOME_DIR));
+        resetSystemProperties();
+        System.setSecurityManager(null);
+    }
+
+    public static synchronized FalconUnitClient getClient() throws FalconException {
+        if (!isFalconUnitActive) {
+            throw new IllegalStateException("Falcon Unit is not initialized");
+        }
+        if (falconUnitClient == null) {
+            falconUnitClient = new FalconUnitClient();
+        }
+        return falconUnitClient;
+    }
+
+    public static FileSystem getFileSystem() throws IOException {
+        if (!isFalconUnitActive) {
+            throw new IllegalStateException("Falcon Unit is not initialized");
+        }
+        return jailedFileSystem;
+    }
+
+    // Setting System properties and store their actual values
+    private static void setSystemProperty(String name, String value) {
+        if (!sysProps.containsKey(name)) {
+            String currentValue = System.getProperty(name);
+            sysProps.put(name, currentValue);
+        }
+        if (value != null) {
+            System.setProperty(name, value);
+        } else {
+            System.getProperties().remove(name);
+        }
+    }
+
+
+    /**
+     * Reset changed system properties to their original values.
+     */
+    private static void resetSystemProperties() {
+        if (sysProps != null) {
+            for (Map.Entry<String, String> entry : sysProps.entrySet()) {
+                if (entry.getValue() != null) {
+                    System.setProperty(entry.getKey(), entry.getValue());
+                } else {
+                    System.getProperties().remove(entry.getKey());
+                }
+            }
+            sysProps.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
new file mode 100644
index 0000000..e898fc3
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.client.AbstractFalconClient;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.parser.EntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * Client for Falcon Unit.
+ */
+public class FalconUnitClient extends AbstractFalconClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class);
+
+    protected ConfigurationStore configStore;
+    private AbstractWorkflowEngine workflowEngine;
+
+    public FalconUnitClient() throws FalconException {
+        configStore = ConfigurationStore.get();
+        workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
+    }
+
+    public ConfigurationStore getConfigStore() {
+        return this.configStore;
+    }
+
+
+    /**
+     * Submit a new entity. Entities can be of type feed, process or data end
+     * points. Entity definitions are validated structurally against schema and
+     * subsequently for other rules before they are admitted into the system
+     *
+     * @param type     entity type
+     * @param filePath path for the definition of entity
+     * @return boolean
+     */
+    @Override
+    public APIResult submit(String type, String filePath) throws IOException, FalconCLIException {
+        try {
+            EntityType entityType = EntityType.getEnum(type);
+            InputStream entityStream = FalconUnitHelper.getFileInputStream(filePath);
+            EntityParser entityParser = EntityParserFactory.getParser(entityType);
+            Entity entity = entityParser.parse(entityStream);
+
+            Entity existingEntity = configStore.get(entityType, entity.getName());
+            if (existingEntity != null) {
+                if (EntityUtil.equals(existingEntity, entity)) {
+                    LOG.warn(entity.toShortString() + " already registered with same definition " + entity.getName());
+                    return new APIResult(APIResult.Status.SUCCEEDED, "{} already registered with same definition"
+                            + entity.getName());
+                }
+                LOG.warn(entity.toShortString() + " already registered with different definition "
+                        + "Can't be submitted again. Try removing before submitting.");
+                return new APIResult(APIResult.Status.FAILED, "{} already registered with different definition "
+                        + "Can't be submitted again. Try removing before submitting." + entity.getName());
+            }
+
+            entityParser.validate(entity);
+            configStore.publish(entityType, entity);
+            LOG.info("Submit successful: ({}): {}", entityType.name(), entity.getName());
+            return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type + ") " + entity.getName());
+        } catch (FalconException e) {
+            throw new FalconCLIException("FAILED", e);
+        }
+    }
+
+    /**
+     * Schedules submitted entity.
+     *
+     * @param entityType entity Type
+     * @param entityName entity name
+     * @param cluster    cluster on which it has to be scheduled
+     * @return
+     * @throws FalconCLIException
+     * @throws FalconException
+     */
+    @Override
+    public APIResult schedule(EntityType entityType, String entityName, String cluster) throws FalconCLIException {
+        return schedule(entityType, entityName, null, 0, cluster);
+    }
+
+
+    /**
+     * Schedules an submitted process entity immediately.
+     *
+     * @param entityName   entity name
+     * @param startTime    start time for process while scheduling
+     * @param numInstances numInstances of process to be scheduled
+     * @param cluster      cluster on which process to be scheduled
+     * @return boolean
+     */
+    public APIResult schedule(EntityType entityType, String entityName, String startTime, int numInstances,
+                              String cluster) throws FalconCLIException {
+        try {
+            FalconUnitHelper.checkSchedulableEntity(entityType.toString());
+            Entity entity = EntityUtil.getEntity(entityType, entityName);
+            boolean clusterPresent = checkAndUpdateCluster(entity, entityType, cluster);
+            if (!clusterPresent) {
+                LOG.warn("Cluster is not registered with this entity " + entityName);
+                return new APIResult(APIResult.Status.FAILED, entity + "Cluster is not registered with this entity "
+                        + entityName);
+            }
+            if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) {
+                updateStartAndEndTime((Process) entity, startTime, numInstances, cluster);
+            }
+            workflowEngine.schedule(entity);
+            LOG.info(entityName + " is scheduled successfully");
+            return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully");
+        } catch (FalconException e) {
+            throw new FalconCLIException("FAILED", e);
+        }
+    }
+
+    /**
+     * Instance status for a given nominalTime.
+     *
+     * @param entityType  entity type
+     * @param entityName  entity name
+     * @param nominalTime nominal time of process
+     * @return InstancesResult.WorkflowStatus
+     */
+    public InstancesResult.WorkflowStatus getInstanceStatus(EntityType entityType, String entityName,
+                                                            String nominalTime) throws Exception {
+        if (entityType == EntityType.CLUSTER) {
+            throw new IllegalArgumentException("Instance management functions don't apply to Cluster entities");
+        }
+        Entity entityObject = EntityUtil.getEntity(entityType, entityName);
+        Date startTime = SchemaHelper.parseDateUTC(nominalTime);
+        Date endTime = DateUtil.getNextMinute(startTime);
+        List<LifeCycle> lifeCycles = FalconUnitHelper.checkAndUpdateLifeCycle(null, entityType.name());
+        InstancesResult instancesResult = workflowEngine.getStatus(entityObject, startTime, endTime, lifeCycles);
+        if (instancesResult.getInstances() != null && instancesResult.getInstances().length > 0
+                && instancesResult.getInstances()[0] != null) {
+            LOG.info("Instance status is " + instancesResult.getInstances()[0].getStatus());
+            return instancesResult.getInstances()[0].getStatus();
+        }
+        return null;
+    }
+
+    private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) {
+        if (entityType == EntityType.FEED) {
+            return checkAndUpdateFeedClusters(entity, cluster);
+        } else if (entityType == EntityType.PROCESS) {
+            return checkAndUpdateProcessClusters(entity, cluster);
+        } else {
+            throw new IllegalArgumentException("entity type {} is not supported " + entityType);
+        }
+    }
+
+    private boolean checkAndUpdateProcessClusters(Entity entity, String cluster) {
+        Process processEntity = (Process) entity;
+        List<Cluster> clusters = processEntity.getClusters().getClusters();
+        List<Cluster> newClusters = new ArrayList<>();
+        if (clusters != null) {
+            for (Cluster processCluster : clusters) {
+                if (processCluster.getName().equalsIgnoreCase(cluster)) {
+                    newClusters.add(processCluster);
+                }
+            }
+        }
+        if (newClusters.isEmpty()) {
+            LOG.warn("Cluster is not registered with this entity " + entity.getName());
+            return false;
+        }
+        processEntity.getClusters().getClusters().removeAll(clusters);
+        processEntity.getClusters().getClusters().addAll(newClusters);
+        return true;
+    }
+
+    private boolean checkAndUpdateFeedClusters(Entity entity, String cluster) {
+        Feed feedEntity = (Feed) entity;
+        List<org.apache.falcon.entity.v0.feed.Cluster> clusters = feedEntity.getClusters().getClusters();
+        List<org.apache.falcon.entity.v0.feed.Cluster> newClusters = new ArrayList<>();
+        if (clusters != null) {
+            for (org.apache.falcon.entity.v0.feed.Cluster feedClusters : clusters) {
+                if (feedClusters.getName().equalsIgnoreCase(cluster)) {
+                    newClusters.add(feedClusters);
+                }
+            }
+        }
+        if (newClusters.isEmpty()) {
+            LOG.warn("Cluster is not registered with this entity " + entity.getName());
+            return false;
+        }
+        feedEntity.getClusters().getClusters().removeAll(clusters);
+        feedEntity.getClusters().getClusters().addAll(newClusters);
+        return true;
+    }
+
+    private void updateStartAndEndTime(Process processEntity, String startTimeStr, int numInstances, String cluster) {
+        List<Cluster> clusters = processEntity.getClusters().getClusters();
+        if (clusters != null) {
+            for (Cluster processCluster : clusters) {
+                if (processCluster.getName().equalsIgnoreCase(cluster)) {
+                    Validity validity = new Validity();
+                    Date startTime = SchemaHelper.parseDateUTC(startTimeStr);
+                    validity.setStart(startTime);
+                    Date endTime = EntityUtil.getNextInstanceTime(startTime, processEntity.getFrequency(),
+                            TimeZone.getTimeZone("UTC"), numInstances);
+                    validity.setEnd(endTime);
+                    processCluster.setValidity(validity);
+                }
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java
new file mode 100644
index 0000000..604a3f9
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.UnschedulableEntityException;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for Falcon Unit.
+ */
+public final class FalconUnitHelper {
+    private FalconUnitHelper() {
+    }
+
+    /**
+     * Converts a InputStream into FileInputStream.
+     *
+     * @param filePath - Path of file to stream
+     * @return ServletInputStream
+     * @throws org.apache.falcon.FalconException
+     */
+    public static InputStream getFileInputStream(String filePath) throws FalconException {
+        if (filePath == null) {
+            throw new IllegalArgumentException("file path should not be null");
+        }
+        InputStream stream;
+        try {
+            stream = new FileInputStream(filePath);
+        } catch (FileNotFoundException e) {
+            throw new FalconException("File not found: " + filePath);
+        }
+        return stream;
+    }
+
+    /**
+     * Updates lifecycle based on entity.
+     *
+     * @param lifeCycleValues
+     * @param type            entity type
+     * @return list of lifecycle values after check and update
+     */
+    public static List<LifeCycle> checkAndUpdateLifeCycle(List<LifeCycle> lifeCycleValues,
+                                                          String type) throws FalconException {
+        EntityType entityType = EntityType.getEnum(type);
+        if (lifeCycleValues == null || lifeCycleValues.isEmpty()) {
+            List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>();
+            if (entityType == EntityType.PROCESS) {
+                lifeCycles.add(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+            } else if (entityType == EntityType.FEED) {
+                lifeCycles.add(LifeCycle.valueOf(LifeCycle.REPLICATION.name()));
+            }
+            return lifeCycles;
+        }
+        for (LifeCycle lifeCycle : lifeCycleValues) {
+            if (entityType != lifeCycle.getTag().getType()) {
+                throw new FalconException("Incorrect lifecycle: " + lifeCycle + "for given type: " + type);
+            }
+        }
+        return lifeCycleValues;
+    }
+
+    /**
+     * Checks entity is schedulable or not.
+     *
+     * @param type
+     * @throws UnschedulableEntityException
+     */
+    public static void checkSchedulableEntity(String type) throws UnschedulableEntityException {
+        EntityType entityType = EntityType.getEnum(type);
+        if (!entityType.isSchedulable()) {
+            throw new UnschedulableEntityException(
+                    "Entity type (" + type + ") " + " cannot be Scheduled/Suspended/Resumed");
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java b/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java
new file mode 100644
index 0000000..060b662
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Local ClientProtocol provider for Hadoop.
+ */
+public class LocalFalconClientProtocolProvider extends ClientProtocolProvider {
+
+    private LocalJobRunner localJobRunner = null;
+    private static final String UNIT = "unit";
+
+    @Override
+    public ClientProtocol create(Configuration conf) throws IOException {
+        String framework = conf.get(MRConfig.FRAMEWORK_NAME, UNIT);
+        if (!UNIT.equals(framework)) {
+            return null;
+        }
+        return getLocalJobRunner(conf);
+    }
+
+    @Override
+    public ClientProtocol create(InetSocketAddress inetSocketAddress, Configuration conf) throws IOException {
+        return create(conf);
+    }
+
+    @Override
+    public void close(ClientProtocol clientProtocol) throws IOException {
+
+    }
+
+    private synchronized LocalJobRunner getLocalJobRunner(Configuration conf) throws IOException {
+        if (localJobRunner == null) {
+            localJobRunner = new LocalJobRunner(conf);
+        }
+        return localJobRunner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..2891352
--- /dev/null
+++ b/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ org.apache.falcon.unit.LocalFalconClientProtocolProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/core-site.xml b/unit/src/main/resources/core-site.xml
new file mode 100644
index 0000000..fd8550f
--- /dev/null
+++ b/unit/src/main/resources/core-site.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+
+    <property>
+        <name>fs.fsext.impl</name>
+        <value>org.apache.falcon.hadoop.FileSystemExtension</value>
+    </property>
+
+    <property>
+        <name>fs.defaultFS</name>
+        <value>jail://global:00</value>
+    </property>
+
+    <property>
+        <name>fs.jail.impl</name>
+        <value>org.apache.falcon.hadoop.JailedFileSystem</value>
+    </property>
+
+
+</configuration>


Mime
View raw message