falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: Support co-existence of Oozie scheduler (coord) and Falcon native scheduler
Date Tue, 01 Dec 2015 11:26:59 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 17a4fcbd6 -> 8b352fcfa


Support co-existence of Oozie scheduler (coord) and Falcon native scheduler


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8b352fcf
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8b352fcf
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8b352fcf

Branch: refs/heads/master
Commit: 8b352fcfa230cdd9c26f5b1db29b4d7c43dda94f
Parents: 17a4fcb
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Dec 1 16:56:25 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Dec 1 16:56:25 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/workflow/WorkflowEngineFactory.java  |  83 ++++++++++++-
 .../WorkflowJobEndNotificationService.java      |   3 +-
 .../workflow/engine/AbstractWorkflowEngine.java |   7 ++
 docs/src/site/twiki/falconcli/Schedule.twiki    |  13 +-
 .../workflow/engine/OozieWorkflowEngine.java    |   5 +
 .../falcon/resource/AbstractEntityManager.java  |  21 ++--
 .../resource/AbstractInstanceManager.java       |  18 +--
 .../AbstractSchedulableEntityManager.java       |  21 ++--
 .../workflow/engine/FalconWorkflowEngine.java   |  12 +-
 .../falcon/workflow/engine/OozieDAGEngine.java  |   2 +
 .../execution/FalconExecutionServiceTest.java   |   1 -
 .../engine/WorkflowEngineFactoryTest.java       | 123 +++++++++++++++++++
 unit/pom.xml                                    |   6 +
 14 files changed, 278 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 028842c..2f3787a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1233 Support co-existence of Oozie scheduler (coord) and Falcon native scheduler
(Pallavi Rao)
+
     FALCON-1596 Spring shell based CLI for Falcon
 	FALCON-1608 Base framework for Spring Shell based shell for Falcon (Rajat Khandelwal via
Ajay Yadava)
  

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
index 49592ac..6555906 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
@@ -19,9 +19,14 @@
 package org.apache.falcon.workflow;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
 
 /**
  * Factory for providing appropriate workflow engine to the falcon service.
@@ -29,19 +34,89 @@ import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 @SuppressWarnings("unchecked")
 public final class WorkflowEngineFactory {
 
-    private static final String WORKFLOW_ENGINE = "workflow.engine.impl";
-
+    private static final Logger LOG = LoggerFactory.getLogger(WorkflowEngineFactory.class);
+    public static final String ENGINE_PROP="falcon.scheduler";
+    private static AbstractWorkflowEngine nativeWorkflowEngine;
+    private static AbstractWorkflowEngine configuredWorkflowEngine;
+    private static final String CONFIGURED_WORKFLOW_ENGINE = "workflow.engine.impl";
     private static final String LIFECYCLE_ENGINE = "lifecycle.engine.impl";
 
     private WorkflowEngineFactory() {
     }
 
+    /**
+     * @param entity
+     * @return The workflow engine using which the entity is scheduled.
+     * @throws FalconException
+     */
+    public static AbstractWorkflowEngine getWorkflowEngine(Entity entity) throws FalconException
{
+        // The below check is only for schedulable entities.
+        if (entity != null
+                && entity.getEntityType().isSchedulable() && getNativeWorkflowEngine().isActive(entity))
{
+            LOG.debug("Returning native workflow engine for entity {}", entity.getName());
+            return nativeWorkflowEngine;
+        }
+        LOG.debug("Returning configured workflow engine for entity {}.", entity);
+        return getWorkflowEngine();
+    }
+
+    /**
+     * @param entity
+     * @param props
+     * @return Workflow engine as specified in the props and for a given schedulable entity.
+     * @throws FalconException
+     */
+    public static AbstractWorkflowEngine getWorkflowEngine(Entity entity, Map<String,
String> props)
+        throws FalconException {
+        // If entity is null or not schedulable and the engine property is not specified,
return the configured WE.
+        if (entity == null || !entity.getEntityType().isSchedulable()
+                || props == null || props.isEmpty() || !props.containsKey(ENGINE_PROP)) {
+            LOG.debug("Returning configured workflow engine for entity {}.", entity);
+            return getWorkflowEngine();
+        }
+
+        String engineName = props.get(ENGINE_PROP);
+        if (engineName.equalsIgnoreCase(getWorkflowEngine().getName())) {
+            // If already active on native
+            if (getNativeWorkflowEngine().isActive(entity)) {
+                throw new FalconException("Entity " + entity.getName() + " is already scheduled
on native engine.");
+            }
+            LOG.debug("Returning configured workflow engine for entity {}", entity.getName());
+            return configuredWorkflowEngine;
+        } else if (engineName.equalsIgnoreCase(getNativeWorkflowEngine().getName())) {
+            // If already active on configured workflow engine
+            if (getWorkflowEngine().isActive(entity)) {
+                throw new FalconException("Entity " + entity.getName() + " is already scheduled
on "
+                        + "configured workflow engine.");
+            }
+            LOG.debug("Returning native workflow engine for entity {}", entity.getName());
+            return nativeWorkflowEngine;
+        } else {
+            throw new IllegalArgumentException("Property " + ENGINE_PROP + " is not set to
a valid value.");
+        }
+    }
+
+    /**
+     * @return An instance of the configurated workflow engine.
+     * @throws FalconException
+     */
     public static AbstractWorkflowEngine getWorkflowEngine() throws FalconException {
-        return ReflectionUtils.getInstance(WORKFLOW_ENGINE);
+        // Caching is only for optimization, workflow engine doesn't need to be a singleton.
+        if (configuredWorkflowEngine == null) {
+            configuredWorkflowEngine = ReflectionUtils.getInstance(CONFIGURED_WORKFLOW_ENGINE);
+        }
+        return configuredWorkflowEngine;
+    }
+
+    public static AbstractWorkflowEngine getNativeWorkflowEngine() throws FalconException
{
+        if (nativeWorkflowEngine  ==  null) {
+            nativeWorkflowEngine =
+                    ReflectionUtils.getInstanceByClassName("org.apache.falcon.workflow.engine.FalconWorkflowEngine");
+        }
+        return nativeWorkflowEngine;
     }
 
     public static AbstractPolicyBuilderFactory getLifecycleEngine() throws FalconException
{
         return ReflectionUtils.getInstance(LIFECYCLE_ENGINE);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 9d96fa3..630c56c 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -164,7 +164,7 @@ public class WorkflowJobEndNotificationService implements FalconService
{
                 }
                 for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
                     try {
-                        InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine()
+                        InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
                                 .getJobDetails(cluster, context.getWorkflowId()).getInstances();
                         if (instances != null && instances.length > 0) {
                             wfProps = getWFProps(instances[0].getWfParams());
@@ -204,7 +204,6 @@ public class WorkflowJobEndNotificationService implements FalconService
{
         return props;
     }
 
-
     // This method handles both success and failure notifications.
     private void notifyWorkflowEnd(WorkflowExecutionContext context) {
         // Need to distinguish notification from post processing for backward compatibility

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 7b36b11..b53efe6 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -109,4 +109,11 @@ public abstract class AbstractWorkflowEngine {
     public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException;
 
     public abstract Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException;
+
+
+    /**
+     * Returns the short name of the Workflow Engine.
+     * @return
+     */
+    public abstract String getName();
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/docs/src/site/twiki/falconcli/Schedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/falconcli/Schedule.twiki b/docs/src/site/twiki/falconcli/Schedule.twiki
index 42192c7..63aa9c1 100644
--- a/docs/src/site/twiki/falconcli/Schedule.twiki
+++ b/docs/src/site/twiki/falconcli/Schedule.twiki
@@ -7,7 +7,16 @@ Once submitted, an entity can be scheduled using schedule option. Process
and fe
 Usage:
 $FALCON_HOME/bin/falcon entity  -type [process|feed] -name <<name>> -schedule
 
-Optional Arg : -skipDryRun -doAs <username>
--properties <<key1:val1,...,keyN:valN>>
+Optional Args :
 
+-skipDryRun When this argument is specified, Falcon skips oozie dryrun.
 
+-doAs <username>
+
+-properties <<key1:val1,...,keyN:valN>>. Specifying 'falcon.scheduler:native'
as a property will schedule the entity on the the native scheduler of Falcon. Else, it will
default to the engine specified in startup.properties.
+
+Examples:
+
+ $FALCON_HOME/bin/falcon entity  -type process -name sampleProcess -schedule
+
+ $FALCON_HOME/bin/falcon entity  -type process -name sampleProcess -schedule -properties
falcon.scheduler:native
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 724f646..91230b2 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1684,6 +1684,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    @Override
+    public String getName() {
+        return "oozie";
+    }
+
     private String getUserWorkflowAction(List<WorkflowAction> actionsList){
         for (WorkflowAction wfAction : actionsList) {
             if (StringUtils.equals(wfAction.getName(), "user-action")) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 16ef83a..5571326 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -78,15 +78,9 @@ public abstract class AbstractEntityManager {
     protected static final String DO_AS_PARAM = "doAs";
 
     protected static final int XML_DEBUG_LEN = 10 * 1024;
-    private AbstractWorkflowEngine workflowEngine;
     protected ConfigurationStore configStore = ConfigurationStore.get();
 
     public AbstractEntityManager() {
-        try {
-            workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
-        } catch (FalconException e) {
-            throw new FalconRuntimException(e);
-        }
     }
 
     protected static Integer getDefaultResultsPerPage() {
@@ -231,7 +225,7 @@ public abstract class AbstractEntityManager {
                 Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
                 for (String cluster : clusters) {
                     try {
-                        getWorkflowEngine().dryRun(entity, cluster, skipDryRun);
+                        getWorkflowEngine(entity).dryRun(entity, cluster, skipDryRun);
                     } catch (FalconException e) {
                         throw new FalconException("dryRun failed on cluster " + cluster,
e);
                     }
@@ -270,8 +264,8 @@ public abstract class AbstractEntityManager {
                 canRemove(entityObj);
                 obtainEntityLocks(entityObj, "delete", tokenList);
                 if (entityType.isSchedulable() && !DeploymentUtil.isPrism()) {
-                    getWorkflowEngine().delete(entityObj);
-                    removedFromEngine = "(KILLED in ENGINE)";
+                    getWorkflowEngine(entityObj).delete(entityObj);
+                    removedFromEngine = "(KILLED in WF_ENGINE)";
                 }
 
                 configStore.remove(entityType, entity);
@@ -327,10 +321,10 @@ public abstract class AbstractEntityManager {
                 oldClusters.removeAll(newClusters); //deleted clusters
 
                 for (String cluster : newClusters) {
-                    result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster,
skipDryRun));
+                    result.append(getWorkflowEngine(oldEntity).update(oldEntity, newEntity,
cluster, skipDryRun));
                 }
                 for (String cluster : oldClusters) {
-                    getWorkflowEngine().delete(oldEntity, cluster);
+                    getWorkflowEngine(oldEntity).delete(oldEntity, cluster);
                 }
             }
 
@@ -568,6 +562,7 @@ public abstract class AbstractEntityManager {
     protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException
{
         EntityStatus status = EntityStatus.SUBMITTED;
 
+        AbstractWorkflowEngine workflowEngine = getWorkflowEngine(entity);
         if (type.isSchedulable()) {
             if (workflowEngine.isActive(entity)) {
                 if (workflowEngine.isSuspended(entity)) {
@@ -1104,8 +1099,8 @@ public abstract class AbstractEntityManager {
     }
 
 
-    protected AbstractWorkflowEngine getWorkflowEngine() {
-        return this.workflowEngine;
+    protected AbstractWorkflowEngine getWorkflowEngine(Entity entity) throws FalconException
{
+        return WorkflowEngineFactory.getWorkflowEngine(entity);
     }
 
     protected <T extends APIResult> T consolidateResult(Map<String, T> results,
Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index fea2989..81960a0 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -123,8 +123,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
             validateNotEmpty("entityName", entity);
             validateInstanceFilterByClause(filterBy);
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
             Entity entityObject = EntityUtil.getEntity(type, entity);
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return getInstanceResultSubset(wfEngine.getRunningInstances(entityObject, lifeCycles),
                     filterBy, orderBy, sortOrder, offset, numResults);
         } catch (Throwable e) {
@@ -175,7 +175,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             Pair<Date, Date> startAndEndDate = getStartAndEndDate(entityObject, startStr,
endStr, numResults);
 
             // LifeCycle lifeCycleObject = EntityUtil.getLifeCycle(lifeCycle);
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return getInstanceResultSubset(wfEngine.getStatus(entityObject,
                             startAndEndDate.first, startAndEndDate.second, lifeCycles),
                     filterBy, orderBy, sortOrder, offset, numResults);
@@ -255,7 +255,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             Entity entityObject = EntityUtil.getEntity(type, entity);
             Pair<Date, Date> startAndEndDate = getStartAndEndDate(entityObject, startStr,
endStr);
 
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return getInstanceSummaryResultSubset(wfEngine.getSummary(entityObject,
                     startAndEndDate.first, startAndEndDate.second, lifeCycles),
                     filterBy, orderBy, sortOrder);
@@ -547,7 +547,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             Date start = startAndEndDate.first;
             Date end = EntityUtil.getNextInstanceTime(start, EntityUtil.getFrequency(entityObject),
                     EntityUtil.getTimeZone(entityObject), 1);
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return wfEngine.getInstanceParams(entityObject, start, end, lifeCycles);
         } catch (Throwable e) {
             LOG.error("Failed to display params of an instance", e);
@@ -572,7 +572,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return wfEngine.killInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
         } catch (Throwable e) {
@@ -598,7 +598,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return wfEngine.suspendInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
         } catch (Throwable e) {
@@ -624,7 +624,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return wfEngine.resumeInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
         } catch (Throwable e) {
@@ -787,7 +787,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
 
     private InstancesResult.WorkflowStatus getProcessInstanceStatus(Process process, Date
instanceTime)
         throws FalconException {
-        AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+        AbstractWorkflowEngine wfEngine = getWorkflowEngine(process);
         List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>();
         lifeCycles.add(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
         Date endRange = new Date(instanceTime.getTime() + 200);
@@ -817,7 +817,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
             Pair<Date, Date> startAndEndDate = getStartAndEndDateForLifecycleOperations(
                     entityObject, startStr, endStr);
 
-            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject);
             return wfEngine.reRunInstances(entityObject,
                     startAndEndDate.first, startAndEndDate.second, props, lifeCycles, isForced);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index d317aa1..88131f3 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -33,6 +33,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.service.FeedSLAMonitoringService;
 import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +61,13 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
     private static MemoryLocks memoryLocks = MemoryLocks.getInstance();
 
     /**
-     * Schedules an submitted entity immediately.
+     * Schedules a submitted entity immediately.
      *
      * @param type   entity type
      * @param entity entity name
+     * @param properties Specifying 'falcon.scheduler:native' as a property will schedule
the entity on the
+     *                   native workflow engine, else it will default to the workflow engine
+     *                   as defined in startup.properties.
      * @return APIResult
      */
     public APIResult schedule(
@@ -95,7 +99,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
                         + entityObj.toShortString());
             }
             LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName());
-            getWorkflowEngine().schedule(entityObj, skipDryRun, properties);
+            WorkflowEngineFactory.getWorkflowEngine(entityObj, properties).schedule(entityObj,
skipDryRun, properties);
         } catch (Exception e) {
             throw new FalconException("Entity schedule failed for " + type + ": " + entity,
e);
         } finally {
@@ -177,6 +181,9 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
      * Submits a new entity and schedules it immediately.
      *
      * @param type   entity type
+     * @param properties Specifying 'falcon.scheduler:native' as a property will schedule
the entity on the
+     *                   native workflow engine, else it will default to the workflow engine
+     *                   as defined in startup.properties.
      * @return APIResult
      */
     public APIResult submitAndSchedule(
@@ -212,8 +219,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         try {
             checkSchedulableEntity(type);
             Entity entityObj = EntityUtil.getEntity(type, entity);
-            if (getWorkflowEngine().isActive(entityObj)) {
-                getWorkflowEngine().suspend(entityObj);
+            if (getWorkflowEngine(entityObj).isActive(entityObj)) {
+                getWorkflowEngine(entityObj).suspend(entityObj);
             } else {
                 throw new FalconException(entity + "(" + type + ") is not scheduled");
             }
@@ -240,8 +247,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         try {
             checkSchedulableEntity(type);
             Entity entityObj = EntityUtil.getEntity(type, entity);
-            if (getWorkflowEngine().isActive(entityObj)) {
-                getWorkflowEngine().resume(entityObj);
+            if (getWorkflowEngine(entityObj).isActive(entityObj)) {
+                getWorkflowEngine(entityObj).resume(entityObj);
             } else {
                 throw new FalconException(entity + "(" + type + ") is not scheduled");
             }
@@ -347,7 +354,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
             decorateEntityWithACL(entity);
             Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
             for (String cluster : clusters) {
-                result.append(getWorkflowEngine().touch(entity, cluster, skipDryRun));
+                result.append(getWorkflowEngine(entity).touch(entity, cluster, skipDryRun));
             }
         } catch (Throwable e) {
             LOG.error("Touch failed", e);

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 5c7bf91..ac7cde8 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -86,7 +86,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public boolean isActive(Entity entity) throws FalconException {
-        return STATE_STORE.getEntity(new EntityID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED;
+        EntityID id = new EntityID(entity);
+        // Ideally state store should have all entities, but, check anyway.
+        if (STATE_STORE.entityExists(id)) {
+            return STATE_STORE.getEntity(id).getCurrentState() != EntityState.STATE.SUBMITTED;
+        }
+        return false;
     }
 
     @Override
@@ -367,5 +372,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException
{
         throw new UnsupportedOperationException("Not yet Implemented");
     }
+
+    @Override
+    public String getName() {
+        return "native";
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
index ca2010b..70c8353 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -159,6 +159,7 @@ public class OozieDAGEngine implements DAGEngine {
         props.put("feedNames", "NONE");
         props.put("feedInstancePaths", "NONE");
         props.put("userJMSNotificationEnabled", "true");
+        props.put("systemJMSNotificationEnabled", "false");
         return props;
     }
 
@@ -176,6 +177,7 @@ public class OozieDAGEngine implements DAGEngine {
         props.put("feedNames", "NONE");
         props.put("feedInstancePaths", "NONE");
         props.put("userJMSNotificationEnabled", "true");
+        props.put("systemJMSNotificationEnabled", "false");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index 2a9fbce..0ddf895 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -121,7 +121,6 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase
{
         mockSchedulerService = Mockito.mock(SchedulerService.class);
         Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService");
         StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
-        StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName());
         dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster"));
         Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class),
                 Mockito.any(ID.class))).thenCallRealMethod();

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
b/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
new file mode 100644
index 0000000..7e502cd
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests the WorkflowEngineFactory class.
+ */
+public class WorkflowEngineFactoryTest extends AbstractTestBase {
+
+    private StateStore stateStore = null;
+
+    @BeforeClass
+    public void init() throws Exception {
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        StartupProperties.get().setProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.InMemoryStateStore");
+        setupConfigStore();
+    }
+
+    @AfterClass
+    public void tearDown() {
+        this.dfsCluster.shutdown();
+    }
+
+    // State store is set up to sync with Config Store. That gets tested too.
+    public void setupConfigStore() throws Exception {
+        stateStore = AbstractStateStore.get();
+        getStore().registerListener(stateStore);
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+        storeEntity(EntityType.PROCESS, "summarize");
+    }
+
+    @Test
+    public void testGetEngineByEntity() throws FalconException {
+        // When entity is not specified, return oozie
+        AbstractWorkflowEngine engine = WorkflowEngineFactory.getWorkflowEngine(null);
+        Assert.assertTrue(engine instanceof OozieWorkflowEngine);
+
+        // When entity not active on native, return oozie
+        Process process = getStore().get(EntityType.PROCESS, "summarize");
+        engine = WorkflowEngineFactory.getWorkflowEngine(process);
+        Assert.assertTrue(engine instanceof OozieWorkflowEngine);
+
+        // When entity active on native, return native
+        stateStore.getEntity(new EntityID(process)).setCurrentState(EntityState.STATE.SCHEDULED);
+        engine = WorkflowEngineFactory.getWorkflowEngine(process);
+        Assert.assertTrue(engine instanceof FalconWorkflowEngine);
+    }
+
+    @Test
+    public void testGetEngineByEntityAndProps() throws FalconException {
+        // When entity is not specified, return oozie
+        AbstractWorkflowEngine engine = WorkflowEngineFactory.getWorkflowEngine(null, null);
+        Assert.assertTrue(engine instanceof OozieWorkflowEngine);
+
+        // When entity specified, but, no props, return oozie
+        Process process = getStore().get(EntityType.PROCESS, "summarize");
+        stateStore.getEntity(new EntityID(process)).setCurrentState(EntityState.STATE.SUBMITTED);
+        engine = WorkflowEngineFactory.getWorkflowEngine(process, null);
+        Assert.assertTrue(engine instanceof OozieWorkflowEngine);
+
+        // When entity specified, props set to oozie, return oozie
+        Map<String, String> props = new HashMap<>();
+        props.put(WorkflowEngineFactory.ENGINE_PROP, "oozie");
+        engine = WorkflowEngineFactory.getWorkflowEngine(process, props);
+        Assert.assertTrue(engine instanceof OozieWorkflowEngine);
+    }
+
+    @Test (expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = ".* is already scheduled on native engine.")
+    public void testGetEngineError() throws FalconException {
+        Process process = getStore().get(EntityType.PROCESS, "summarize");
+        // When entity specified, props set to oozie, but scheduled on native, exception
+        stateStore.getEntity(new EntityID(process)).setCurrentState(EntityState.STATE.SCHEDULED);
+        Map<String, String> props = new HashMap<>();
+        props.put(WorkflowEngineFactory.ENGINE_PROP, "oozie");
+        WorkflowEngineFactory.getWorkflowEngine(process, props);
+    }
+
+    @Test
+    public void testGetEngineForCluster() throws FalconException {
+        AbstractWorkflowEngine engine =
+                WorkflowEngineFactory.getWorkflowEngine(getStore().get(EntityType.CLUSTER,
"testCluster"));
+        Assert.assertTrue(engine instanceof OozieWorkflowEngine);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/unit/pom.xml
----------------------------------------------------------------------
diff --git a/unit/pom.xml b/unit/pom.xml
index 8d9f443..20c848e 100644
--- a/unit/pom.xml
+++ b/unit/pom.xml
@@ -73,6 +73,12 @@
 
         <dependency>
             <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-scheduler</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-prism</artifactId>
             <classifier>classes</classifier>
             <version>${project.version}</version>


Mime
View raw message