Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E8FB18A93 for ; Tue, 1 Dec 2015 11:27:00 +0000 (UTC) Received: (qmail 44402 invoked by uid 500); 1 Dec 2015 11:27:00 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 44343 invoked by uid 500); 1 Dec 2015 11:26:59 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 44334 invoked by uid 99); 1 Dec 2015 11:26:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Dec 2015 11:26:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B4D91E03EC; Tue, 1 Dec 2015 11:26:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pallavi@apache.org To: commits@falcon.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: Support co-existence of Oozie scheduler (coord) and Falcon native scheduler Date: Tue, 1 Dec 2015 11:26:59 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master 17a4fcbd6 -> 8b352fcfa Support co-existence of Oozie scheduler (coord) and Falcon native scheduler Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8b352fcf Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8b352fcf Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8b352fcf Branch: refs/heads/master Commit: 8b352fcfa230cdd9c26f5b1db29b4d7c43dda94f Parents: 17a4fcb Author: Pallavi Rao Authored: Tue Dec 1 16:56:25 2015 +0530 Committer: Pallavi Rao Committed: Tue Dec 1 16:56:25 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/workflow/WorkflowEngineFactory.java | 83 ++++++++++++- .../WorkflowJobEndNotificationService.java | 3 +- .../workflow/engine/AbstractWorkflowEngine.java | 7 ++ docs/src/site/twiki/falconcli/Schedule.twiki | 13 +- .../workflow/engine/OozieWorkflowEngine.java | 5 + .../falcon/resource/AbstractEntityManager.java | 21 ++-- .../resource/AbstractInstanceManager.java | 18 +-- .../AbstractSchedulableEntityManager.java | 21 ++-- .../workflow/engine/FalconWorkflowEngine.java | 12 +- .../falcon/workflow/engine/OozieDAGEngine.java | 2 + .../execution/FalconExecutionServiceTest.java | 1 - .../engine/WorkflowEngineFactoryTest.java | 123 +++++++++++++++++++ unit/pom.xml | 6 + 14 files changed, 278 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 028842c..2f3787a 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1233 Support co-existence of Oozie scheduler (coord) and Falcon native scheduler (Pallavi Rao) + FALCON-1596 Spring shell based CLI for Falcon FALCON-1608 Base framework for Spring Shell based shell for Falcon (Rajat Khandelwal via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java index 49592ac..6555906 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java @@ -19,9 +19,14 @@ package org.apache.falcon.workflow; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * Factory for providing appropriate workflow engine to the falcon service. @@ -29,19 +34,89 @@ import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; @SuppressWarnings("unchecked") public final class WorkflowEngineFactory { - private static final String WORKFLOW_ENGINE = "workflow.engine.impl"; - + private static final Logger LOG = LoggerFactory.getLogger(WorkflowEngineFactory.class); + public static final String ENGINE_PROP="falcon.scheduler"; + private static AbstractWorkflowEngine nativeWorkflowEngine; + private static AbstractWorkflowEngine configuredWorkflowEngine; + private static final String CONFIGURED_WORKFLOW_ENGINE = "workflow.engine.impl"; private static final String LIFECYCLE_ENGINE = "lifecycle.engine.impl"; private WorkflowEngineFactory() { } + /** + * @param entity + * @return The workflow engine using which the entity is scheduled. + * @throws FalconException + */ + public static AbstractWorkflowEngine getWorkflowEngine(Entity entity) throws FalconException { + // The below check is only for schedulable entities. + if (entity != null + && entity.getEntityType().isSchedulable() && getNativeWorkflowEngine().isActive(entity)) { + LOG.debug("Returning native workflow engine for entity {}", entity.getName()); + return nativeWorkflowEngine; + } + LOG.debug("Returning configured workflow engine for entity {}.", entity); + return getWorkflowEngine(); + } + + /** + * @param entity + * @param props + * @return Workflow engine as specified in the props and for a given schedulable entity. + * @throws FalconException + */ + public static AbstractWorkflowEngine getWorkflowEngine(Entity entity, Map props) + throws FalconException { + // If entity is null or not schedulable and the engine property is not specified, return the configured WE. + if (entity == null || !entity.getEntityType().isSchedulable() + || props == null || props.isEmpty() || !props.containsKey(ENGINE_PROP)) { + LOG.debug("Returning configured workflow engine for entity {}.", entity); + return getWorkflowEngine(); + } + + String engineName = props.get(ENGINE_PROP); + if (engineName.equalsIgnoreCase(getWorkflowEngine().getName())) { + // If already active on native + if (getNativeWorkflowEngine().isActive(entity)) { + throw new FalconException("Entity " + entity.getName() + " is already scheduled on native engine."); + } + LOG.debug("Returning configured workflow engine for entity {}", entity.getName()); + return configuredWorkflowEngine; + } else if (engineName.equalsIgnoreCase(getNativeWorkflowEngine().getName())) { + // If already active on configured workflow engine + if (getWorkflowEngine().isActive(entity)) { + throw new FalconException("Entity " + entity.getName() + " is already scheduled on " + + "configured workflow engine."); + } + LOG.debug("Returning native workflow engine for entity {}", entity.getName()); + return nativeWorkflowEngine; + } else { + throw new IllegalArgumentException("Property " + ENGINE_PROP + " is not set to a valid value."); + } + } + + /** + * @return An instance of the configurated workflow engine. + * @throws FalconException + */ public static AbstractWorkflowEngine getWorkflowEngine() throws FalconException { - return ReflectionUtils.getInstance(WORKFLOW_ENGINE); + // Caching is only for optimization, workflow engine doesn't need to be a singleton. + if (configuredWorkflowEngine == null) { + configuredWorkflowEngine = ReflectionUtils.getInstance(CONFIGURED_WORKFLOW_ENGINE); + } + return configuredWorkflowEngine; + } + + public static AbstractWorkflowEngine getNativeWorkflowEngine() throws FalconException { + if (nativeWorkflowEngine == null) { + nativeWorkflowEngine = + ReflectionUtils.getInstanceByClassName("org.apache.falcon.workflow.engine.FalconWorkflowEngine"); + } + return nativeWorkflowEngine; } public static AbstractPolicyBuilderFactory getLifecycleEngine() throws FalconException { return ReflectionUtils.getInstance(LIFECYCLE_ENGINE); } - } http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java index 9d96fa3..630c56c 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java @@ -164,7 +164,7 @@ public class WorkflowJobEndNotificationService implements FalconService { } for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { try { - InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine() + InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity) .getJobDetails(cluster, context.getWorkflowId()).getInstances(); if (instances != null && instances.length > 0) { wfProps = getWFProps(instances[0].getWfParams()); @@ -204,7 +204,6 @@ public class WorkflowJobEndNotificationService implements FalconService { return props; } - // This method handles both success and failure notifications. private void notifyWorkflowEnd(WorkflowExecutionContext context) { // Need to distinguish notification from post processing for backward compatibility http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 7b36b11..b53efe6 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -109,4 +109,11 @@ public abstract class AbstractWorkflowEngine { public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException; public abstract Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException; + + + /** + * Returns the short name of the Workflow Engine. + * @return + */ + public abstract String getName(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/docs/src/site/twiki/falconcli/Schedule.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/falconcli/Schedule.twiki b/docs/src/site/twiki/falconcli/Schedule.twiki index 42192c7..63aa9c1 100644 --- a/docs/src/site/twiki/falconcli/Schedule.twiki +++ b/docs/src/site/twiki/falconcli/Schedule.twiki @@ -7,7 +7,16 @@ Once submitted, an entity can be scheduled using schedule option. Process and fe Usage: $FALCON_HOME/bin/falcon entity -type [process|feed] -name <> -schedule -Optional Arg : -skipDryRun -doAs --properties <> +Optional Args : +-skipDryRun When this argument is specified, Falcon skips oozie dryrun. +-doAs + +-properties <>. Specifying 'falcon.scheduler:native' as a property will schedule the entity on the the native scheduler of Falcon. Else, it will default to the engine specified in startup.properties. + +Examples: + + $FALCON_HOME/bin/falcon entity -type process -name sampleProcess -schedule + + $FALCON_HOME/bin/falcon entity -type process -name sampleProcess -schedule -properties falcon.scheduler:native \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 724f646..91230b2 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -1684,6 +1684,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + @Override + public String getName() { + return "oozie"; + } + private String getUserWorkflowAction(List actionsList){ for (WorkflowAction wfAction : actionsList) { if (StringUtils.equals(wfAction.getName(), "user-action")) { http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 16ef83a..5571326 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -78,15 +78,9 @@ public abstract class AbstractEntityManager { protected static final String DO_AS_PARAM = "doAs"; protected static final int XML_DEBUG_LEN = 10 * 1024; - private AbstractWorkflowEngine workflowEngine; protected ConfigurationStore configStore = ConfigurationStore.get(); public AbstractEntityManager() { - try { - workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); - } catch (FalconException e) { - throw new FalconRuntimException(e); - } } protected static Integer getDefaultResultsPerPage() { @@ -231,7 +225,7 @@ public abstract class AbstractEntityManager { Set clusters = EntityUtil.getClustersDefinedInColos(entity); for (String cluster : clusters) { try { - getWorkflowEngine().dryRun(entity, cluster, skipDryRun); + getWorkflowEngine(entity).dryRun(entity, cluster, skipDryRun); } catch (FalconException e) { throw new FalconException("dryRun failed on cluster " + cluster, e); } @@ -270,8 +264,8 @@ public abstract class AbstractEntityManager { canRemove(entityObj); obtainEntityLocks(entityObj, "delete", tokenList); if (entityType.isSchedulable() && !DeploymentUtil.isPrism()) { - getWorkflowEngine().delete(entityObj); - removedFromEngine = "(KILLED in ENGINE)"; + getWorkflowEngine(entityObj).delete(entityObj); + removedFromEngine = "(KILLED in WF_ENGINE)"; } configStore.remove(entityType, entity); @@ -327,10 +321,10 @@ public abstract class AbstractEntityManager { oldClusters.removeAll(newClusters); //deleted clusters for (String cluster : newClusters) { - result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster, skipDryRun)); + result.append(getWorkflowEngine(oldEntity).update(oldEntity, newEntity, cluster, skipDryRun)); } for (String cluster : oldClusters) { - getWorkflowEngine().delete(oldEntity, cluster); + getWorkflowEngine(oldEntity).delete(oldEntity, cluster); } } @@ -568,6 +562,7 @@ public abstract class AbstractEntityManager { protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException { EntityStatus status = EntityStatus.SUBMITTED; + AbstractWorkflowEngine workflowEngine = getWorkflowEngine(entity); if (type.isSchedulable()) { if (workflowEngine.isActive(entity)) { if (workflowEngine.isSuspended(entity)) { @@ -1104,8 +1099,8 @@ public abstract class AbstractEntityManager { } - protected AbstractWorkflowEngine getWorkflowEngine() { - return this.workflowEngine; + protected AbstractWorkflowEngine getWorkflowEngine(Entity entity) throws FalconException { + return WorkflowEngineFactory.getWorkflowEngine(entity); } protected T consolidateResult(Map results, Class clazz) { http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index fea2989..81960a0 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -123,8 +123,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type); validateNotEmpty("entityName", entity); validateInstanceFilterByClause(filterBy); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); Entity entityObject = EntityUtil.getEntity(type, entity); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return getInstanceResultSubset(wfEngine.getRunningInstances(entityObject, lifeCycles), filterBy, orderBy, sortOrder, offset, numResults); } catch (Throwable e) { @@ -175,7 +175,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair startAndEndDate = getStartAndEndDate(entityObject, startStr, endStr, numResults); // LifeCycle lifeCycleObject = EntityUtil.getLifeCycle(lifeCycle); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return getInstanceResultSubset(wfEngine.getStatus(entityObject, startAndEndDate.first, startAndEndDate.second, lifeCycles), filterBy, orderBy, sortOrder, offset, numResults); @@ -255,7 +255,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Entity entityObject = EntityUtil.getEntity(type, entity); Pair startAndEndDate = getStartAndEndDate(entityObject, startStr, endStr); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return getInstanceSummaryResultSubset(wfEngine.getSummary(entityObject, startAndEndDate.first, startAndEndDate.second, lifeCycles), filterBy, orderBy, sortOrder); @@ -547,7 +547,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Date start = startAndEndDate.first; Date end = EntityUtil.getNextInstanceTime(start, EntityUtil.getFrequency(entityObject), EntityUtil.getTimeZone(entityObject), 1); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return wfEngine.getInstanceParams(entityObject, start, end, lifeCycles); } catch (Throwable e) { LOG.error("Failed to display params of an instance", e); @@ -572,7 +572,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return wfEngine.killInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles); } catch (Throwable e) { @@ -598,7 +598,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return wfEngine.suspendInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles); } catch (Throwable e) { @@ -624,7 +624,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return wfEngine.resumeInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles); } catch (Throwable e) { @@ -787,7 +787,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { private InstancesResult.WorkflowStatus getProcessInstanceStatus(Process process, Date instanceTime) throws FalconException { - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(process); List lifeCycles = new ArrayList(); lifeCycles.add(LifeCycle.valueOf(LifeCycle.EXECUTION.name())); Date endRange = new Date(instanceTime.getTime() + 200); @@ -817,7 +817,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { Pair startAndEndDate = getStartAndEndDateForLifecycleOperations( entityObject, startStr, endStr); - AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + AbstractWorkflowEngine wfEngine = getWorkflowEngine(entityObject); return wfEngine.reRunInstances(entityObject, startAndEndDate.first, startAndEndDate.second, props, lifeCycles, isForced); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index d317aa1..88131f3 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -33,6 +33,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.monitors.Dimension; import org.apache.falcon.service.FeedSLAMonitoringService; import org.apache.falcon.util.DeploymentUtil; +import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.hadoop.security.authorize.AuthorizationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +61,13 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM private static MemoryLocks memoryLocks = MemoryLocks.getInstance(); /** - * Schedules an submitted entity immediately. + * Schedules a submitted entity immediately. * * @param type entity type * @param entity entity name + * @param properties Specifying 'falcon.scheduler:native' as a property will schedule the entity on the + * native workflow engine, else it will default to the workflow engine + * as defined in startup.properties. * @return APIResult */ public APIResult schedule( @@ -95,7 +99,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM + entityObj.toShortString()); } LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName()); - getWorkflowEngine().schedule(entityObj, skipDryRun, properties); + WorkflowEngineFactory.getWorkflowEngine(entityObj, properties).schedule(entityObj, skipDryRun, properties); } catch (Exception e) { throw new FalconException("Entity schedule failed for " + type + ": " + entity, e); } finally { @@ -177,6 +181,9 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM * Submits a new entity and schedules it immediately. * * @param type entity type + * @param properties Specifying 'falcon.scheduler:native' as a property will schedule the entity on the + * native workflow engine, else it will default to the workflow engine + * as defined in startup.properties. * @return APIResult */ public APIResult submitAndSchedule( @@ -212,8 +219,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM try { checkSchedulableEntity(type); Entity entityObj = EntityUtil.getEntity(type, entity); - if (getWorkflowEngine().isActive(entityObj)) { - getWorkflowEngine().suspend(entityObj); + if (getWorkflowEngine(entityObj).isActive(entityObj)) { + getWorkflowEngine(entityObj).suspend(entityObj); } else { throw new FalconException(entity + "(" + type + ") is not scheduled"); } @@ -240,8 +247,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM try { checkSchedulableEntity(type); Entity entityObj = EntityUtil.getEntity(type, entity); - if (getWorkflowEngine().isActive(entityObj)) { - getWorkflowEngine().resume(entityObj); + if (getWorkflowEngine(entityObj).isActive(entityObj)) { + getWorkflowEngine(entityObj).resume(entityObj); } else { throw new FalconException(entity + "(" + type + ") is not scheduled"); } @@ -347,7 +354,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM decorateEntityWithACL(entity); Set clusters = EntityUtil.getClustersDefinedInColos(entity); for (String cluster : clusters) { - result.append(getWorkflowEngine().touch(entity, cluster, skipDryRun)); + result.append(getWorkflowEngine(entity).touch(entity, cluster, skipDryRun)); } } catch (Throwable e) { LOG.error("Touch failed", e); http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 5c7bf91..ac7cde8 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -86,7 +86,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public boolean isActive(Entity entity) throws FalconException { - return STATE_STORE.getEntity(new EntityID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED; + EntityID id = new EntityID(entity); + // Ideally state store should have all entities, but, check anyway. + if (STATE_STORE.entityExists(id)) { + return STATE_STORE.getEntity(id).getCurrentState() != EntityState.STATE.SUBMITTED; + } + return false; } @Override @@ -367,5 +372,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { public Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException { throw new UnsupportedOperationException("Not yet Implemented"); } + + @Override + public String getName() { + return "native"; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java index ca2010b..70c8353 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java @@ -159,6 +159,7 @@ public class OozieDAGEngine implements DAGEngine { props.put("feedNames", "NONE"); props.put("feedInstancePaths", "NONE"); props.put("userJMSNotificationEnabled", "true"); + props.put("systemJMSNotificationEnabled", "false"); return props; } @@ -176,6 +177,7 @@ public class OozieDAGEngine implements DAGEngine { props.put("feedNames", "NONE"); props.put("feedInstancePaths", "NONE"); props.put("userJMSNotificationEnabled", "true"); + props.put("systemJMSNotificationEnabled", "false"); return props; } http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index 2a9fbce..0ddf895 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -121,7 +121,6 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase { mockSchedulerService = Mockito.mock(SchedulerService.class); Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService"); StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName()); - StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName()); dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster")); Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class), Mockito.any(ID.class))).thenCallRealMethod(); http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java b/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java new file mode 100644 index 0000000..7e502cd --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.workflow.engine; + +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.state.EntityID; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.StateStore; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowEngineFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests the WorkflowEngineFactory class. + */ +public class WorkflowEngineFactoryTest extends AbstractTestBase { + + private StateStore stateStore = null; + + @BeforeClass + public void init() throws Exception { + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + StartupProperties.get().setProperty("falcon.state.store.impl", + "org.apache.falcon.state.store.InMemoryStateStore"); + setupConfigStore(); + } + + @AfterClass + public void tearDown() { + this.dfsCluster.shutdown(); + } + + // State store is set up to sync with Config Store. That gets tested too. + public void setupConfigStore() throws Exception { + stateStore = AbstractStateStore.get(); + getStore().registerListener(stateStore); + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + storeEntity(EntityType.PROCESS, "summarize"); + } + + @Test + public void testGetEngineByEntity() throws FalconException { + // When entity is not specified, return oozie + AbstractWorkflowEngine engine = WorkflowEngineFactory.getWorkflowEngine(null); + Assert.assertTrue(engine instanceof OozieWorkflowEngine); + + // When entity not active on native, return oozie + Process process = getStore().get(EntityType.PROCESS, "summarize"); + engine = WorkflowEngineFactory.getWorkflowEngine(process); + Assert.assertTrue(engine instanceof OozieWorkflowEngine); + + // When entity active on native, return native + stateStore.getEntity(new EntityID(process)).setCurrentState(EntityState.STATE.SCHEDULED); + engine = WorkflowEngineFactory.getWorkflowEngine(process); + Assert.assertTrue(engine instanceof FalconWorkflowEngine); + } + + @Test + public void testGetEngineByEntityAndProps() throws FalconException { + // When entity is not specified, return oozie + AbstractWorkflowEngine engine = WorkflowEngineFactory.getWorkflowEngine(null, null); + Assert.assertTrue(engine instanceof OozieWorkflowEngine); + + // When entity specified, but, no props, return oozie + Process process = getStore().get(EntityType.PROCESS, "summarize"); + stateStore.getEntity(new EntityID(process)).setCurrentState(EntityState.STATE.SUBMITTED); + engine = WorkflowEngineFactory.getWorkflowEngine(process, null); + Assert.assertTrue(engine instanceof OozieWorkflowEngine); + + // When entity specified, props set to oozie, return oozie + Map props = new HashMap<>(); + props.put(WorkflowEngineFactory.ENGINE_PROP, "oozie"); + engine = WorkflowEngineFactory.getWorkflowEngine(process, props); + Assert.assertTrue(engine instanceof OozieWorkflowEngine); + } + + @Test (expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = ".* is already scheduled on native engine.") + public void testGetEngineError() throws FalconException { + Process process = getStore().get(EntityType.PROCESS, "summarize"); + // When entity specified, props set to oozie, but scheduled on native, exception + stateStore.getEntity(new EntityID(process)).setCurrentState(EntityState.STATE.SCHEDULED); + Map props = new HashMap<>(); + props.put(WorkflowEngineFactory.ENGINE_PROP, "oozie"); + WorkflowEngineFactory.getWorkflowEngine(process, props); + } + + @Test + public void testGetEngineForCluster() throws FalconException { + AbstractWorkflowEngine engine = + WorkflowEngineFactory.getWorkflowEngine(getStore().get(EntityType.CLUSTER, "testCluster")); + Assert.assertTrue(engine instanceof OozieWorkflowEngine); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8b352fcf/unit/pom.xml ---------------------------------------------------------------------- diff --git a/unit/pom.xml b/unit/pom.xml index 8d9f443..20c848e 100644 --- a/unit/pom.xml +++ b/unit/pom.xml @@ -73,6 +73,12 @@ org.apache.falcon + falcon-scheduler + ${project.version} + + + + org.apache.falcon falcon-prism classes ${project.version}