Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A782018E7E for ; Thu, 8 Oct 2015 11:36:34 +0000 (UTC) Received: (qmail 93995 invoked by uid 500); 8 Oct 2015 11:36:25 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 93955 invoked by uid 500); 8 Oct 2015 11:36:25 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 93946 invoked by uid 99); 8 Oct 2015 11:36:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Oct 2015 11:36:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 038FCE0A48; Thu, 8 Oct 2015 11:36:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pallavi@apache.org To: commits@falcon.apache.org Message-Id: <4308dd68a9db4587a95217b17a213b9a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1231 Improve JobCompletionNotification Service Date: Thu, 8 Oct 2015 11:36:25 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master d08f8bd74 -> f4bd1e7e5 FALCON-1231 Improve JobCompletionNotification Service Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f4bd1e7e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f4bd1e7e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f4bd1e7e Branch: refs/heads/master Commit: f4bd1e7e53ca4f113b78d3a5cf4e55aba99fbe93 Parents: d08f8bd Author: Pallavi Rao Authored: Thu Oct 8 16:41:59 2015 +0530 Committer: Pallavi Rao Committed: Thu Oct 8 16:41:59 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/falcon/resource/InstancesResult.java | 7 + .../falcon/catalog/CatalogPartitionHandler.java | 15 ++ .../falcon/entity/WorkflowNameBuilder.java | 69 ++++-- .../falcon/metadata/MetadataMappingService.java | 14 ++ .../falcon/workflow/WorkflowExecutionArgs.java | 3 + .../workflow/WorkflowExecutionContext.java | 44 +++- .../workflow/WorkflowExecutionListener.java | 31 +++ .../WorkflowJobEndNotificationService.java | 217 ++++++++++++++++--- .../workflow/engine/AbstractWorkflowEngine.java | 2 + .../falcon/entity/TestWorkflowNameBuilder.java | 18 ++ .../workflow/WorkflowExecutionContextTest.java | 17 ++ .../WorkflowJobEndNotificationServiceTest.java | 59 +++-- messaging/pom.xml | 7 + .../falcon/messaging/JMSMessageConsumer.java | 155 +++++++++++-- .../messaging/JMSMessageConsumerTest.java | 187 +++++++++++++--- .../src/main/conf/oozie-site.xml | 49 ++++- .../falcon/workflow/FalconPostProcessing.java | 10 - .../workflow/engine/OozieWorkflowEngine.java | 24 ++ .../workflow/FalconPostProcessingTest.java | 3 - .../falcon/rerun/handler/LateRerunHandler.java | 15 ++ .../falcon/rerun/handler/RetryHandler.java | 19 ++ webapp/src/conf/oozie/conf/oozie-site.xml | 48 ++++ 23 files changed, 877 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 02a65a1..c845062 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1231 Improve JobCompletionNotification Service(Pallavi Rao) + FALCON-1157 Build error when using maven 3.3.x(Venkat Ramachandran via Pallavi Rao) FALCON-1477 Adding "-debug" option to Falcon CLI for debug statements to stdout(Narayan Periwal via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/client/src/main/java/org/apache/falcon/resource/InstancesResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java index 76bb4b0..e05eeeb 100644 --- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java +++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java @@ -109,6 +109,9 @@ public class InstancesResult extends APIResult { public Date endTime; @XmlElement + public int runId; + + @XmlElement public String details; @XmlElement @@ -154,6 +157,10 @@ public class InstancesResult extends APIResult { return endTime; } + public int getRunId() { + return runId; + } + public InstanceAction[] getActions() { return actions; } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java index d0b09df..cccb4f8 100644 --- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java +++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java @@ -295,4 +295,19 @@ public class CatalogPartitionHandler implements WorkflowExecutionListener{ public void onFailure(WorkflowExecutionContext context) throws FalconException { //no-op } + + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java index 6890594..c58be64 100644 --- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java +++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java @@ -20,6 +20,7 @@ package org.apache.falcon.entity; import org.apache.falcon.Pair; import org.apache.falcon.Tag; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import java.util.ArrayList; import java.util.List; @@ -56,13 +57,13 @@ public class WorkflowNameBuilder { } public Tag getWorkflowTag(String workflowName) { - return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? null - : WorkflowName.getTagAndSuffixes(entity, workflowName).first; + return WorkflowName.getTagAndSuffixes(workflowName) == null ? null + : WorkflowName.getTagAndSuffixes(workflowName).first; } public String getWorkflowSuffixes(String workflowName) { - return WorkflowName.getTagAndSuffixes(entity, workflowName) == null ? "" - : WorkflowName.getTagAndSuffixes(entity, workflowName).second; + return WorkflowName.getTagAndSuffixes(workflowName) == null ? "" + : WorkflowName.getTagAndSuffixes(workflowName).second; } /** @@ -70,6 +71,7 @@ public class WorkflowNameBuilder { */ public static class WorkflowName { private static final String SEPARATOR = "_"; + private static final Pattern WF_NAME_PATTERN; private String prefix; private String entityType; @@ -77,6 +79,32 @@ public class WorkflowNameBuilder { private String entityName; private List suffixes; + static { + StringBuilder typePattern = new StringBuilder("("); + for (EntityType type : EntityType.values()) { + typePattern.append(type.name()); + typePattern.append("|"); + } + typePattern = typePattern.deleteCharAt(typePattern.length() - 1); + typePattern.append(")"); + StringBuilder tagsPattern = new StringBuilder("("); + for (Tag tag : Tag.values()) { + tagsPattern.append(tag.name()); + tagsPattern.append("|"); + } + tagsPattern = tagsPattern.deleteCharAt(tagsPattern.length() - 1); + tagsPattern.append(")"); + + String name = "([a-zA-Z][\\-a-zA-Z0-9]*)"; + + String suffix = "([_A-Za-z0-9-.]*)"; + + String namePattern = PREFIX + SEPARATOR + typePattern + SEPARATOR + tagsPattern + + SEPARATOR + name + suffix; + + WF_NAME_PATTERN = Pattern.compile(namePattern); + } + public WorkflowName(String prefix, String entityType, String tag, String entityName, List suffixes) { this.prefix = prefix; @@ -100,28 +128,27 @@ public class WorkflowNameBuilder { return builder.toString(); } - public static Pair getTagAndSuffixes(Entity entity, - String workflowName) { - - StringBuilder namePattern = new StringBuilder(PREFIX + SEPARATOR - + entity.getEntityType().name() + SEPARATOR + "("); - for (Tag tag : Tag.values()) { - namePattern.append(tag.name()); - namePattern.append("|"); + public static Pair getTagAndSuffixes(String workflowName) { + Matcher matcher = WF_NAME_PATTERN.matcher(workflowName); + if (matcher.matches()) { + matcher.reset(); + if (matcher.find()) { + String tag = matcher.group(2); + String suffixes = matcher.group(4); + return new Pair<>(Tag.valueOf(tag), suffixes); + } } - namePattern = namePattern.deleteCharAt(namePattern.length() - 1); - namePattern.append(")" + SEPARATOR + entity.getName() - + "([_A-Za-z0-9-.]*)"); - - Pattern pattern = Pattern.compile(namePattern.toString()); + return null; + } - Matcher matcher = pattern.matcher(workflowName); + public static Pair getEntityNameAndType(String workflowName) { + Matcher matcher = WF_NAME_PATTERN.matcher(workflowName); if (matcher.matches()) { matcher.reset(); if (matcher.find()) { - String tag = matcher.group(1); - String suffixes = matcher.group(2); - return new Pair(Tag.valueOf(tag), suffixes); + String type = matcher.group(1); + String name = matcher.group(3); + return new Pair<>(name, EntityType.valueOf(type)); } } return null; http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java index ef9da45..56fbde0 100644 --- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java +++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java @@ -297,6 +297,20 @@ public class MetadataMappingService // do nothing since lineage is only recorded for successful workflow } + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException { + // TBD + } private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java index 9456fb9..d2430a2 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java @@ -35,6 +35,9 @@ public enum WorkflowExecutionArgs { // where CLUSTER_NAME("cluster", "name of the current cluster"), OPERATION("operation", "operation like generate, delete, replicate"), + // Exactly same as the above. Introduced to ensure compatibility between messages produced by POST-PROCESSING and + // the values in conf. + DATA_OPERATION("falconDataOperation", "operation like generate, delete, replicate", false), // who WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"), http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 4454239..45b6d23 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -62,12 +62,12 @@ public class WorkflowExecutionContext { /** * Workflow execution status. */ - public enum Status {SUCCEEDED, FAILED} + public enum Status {WAITING, RUNNING, SUSPENDED, SUCCEEDED, FAILED, TIMEDOUT, KILLED} /** * Workflow execution type. */ - public enum Type {PRE_PROCESSING, POST_PROCESSING} + public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACTION} /** * Entity operations supported. @@ -107,6 +107,10 @@ public class WorkflowExecutionContext { return context.get(arg); } + public void setValue(WorkflowExecutionArgs arg, String value) { + context.put(arg, value); + } + public String getValue(WorkflowExecutionArgs arg, String defaultValue) { return context.containsKey(arg) ? context.get(arg) : defaultValue; } @@ -128,10 +132,22 @@ public class WorkflowExecutionContext { return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS)); } + public boolean hasWorkflowTimedOut() { + return Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS)); + } + + public boolean hasWorkflowBeenKilled() { + return Status.KILLED.name().equals(getValue(WorkflowExecutionArgs.STATUS)); + } + public String getContextFile() { return getValue(WorkflowExecutionArgs.CONTEXT_FILE); } + public Status getWorkflowStatus() { + return Status.valueOf(getValue(WorkflowExecutionArgs.STATUS)); + } + public String getLogDir() { return getValue(WorkflowExecutionArgs.LOG_DIR); } @@ -211,7 +227,10 @@ public class WorkflowExecutionContext { } public EntityOperations getOperation() { - return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION)); + if (getValue(WorkflowExecutionArgs.OPERATION) != null) { + return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION)); + } + return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.DATA_OPERATION)); } public String getOutputFeedNames() { @@ -282,6 +301,19 @@ public class WorkflowExecutionContext { return creationTime; } + public long getWorkflowStartTime() { + return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME)); + } + + public long getWorkflowEndTime() { + return Long.parseLong(getValue(WorkflowExecutionArgs.WF_END_TIME)); + } + + + public Type getContextType() { + return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE)); + } + /** * this method is invoked from with in the workflow. * @@ -397,7 +429,11 @@ public class WorkflowExecutionContext { } public static WorkflowExecutionContext create(Map wfProperties) { - wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, Type.POST_PROCESSING.name()); + return WorkflowExecutionContext.create(wfProperties, Type.POST_PROCESSING); + } + + public static WorkflowExecutionContext create(Map wfProperties, Type type) { + wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name()); return new WorkflowExecutionContext(wfProperties); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java index 2d3a477..7bf14f2 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java @@ -25,7 +25,38 @@ import org.apache.falcon.FalconException; */ public interface WorkflowExecutionListener { + /** + * Invoked when a workflow is succeeds. + * @param context + * @throws FalconException + */ void onSuccess(WorkflowExecutionContext context) throws FalconException; + /** + * Invoked when a workflow fails. + * @param context + * @throws FalconException + */ void onFailure(WorkflowExecutionContext context) throws FalconException; + + /** + * Invoked on start of a workflow. Basically, when the workflow is RUNNING. + * @param context + * @throws FalconException + */ + void onStart(WorkflowExecutionContext context) throws FalconException; + + /** + * Invoked when a workflow is suspended. + * @param context + * @throws FalconException + */ + void onSuspend(WorkflowExecutionContext context) throws FalconException; + + /** + * Invoked when a workflow is in waiting state. + * @param context + * @throws FalconException + */ + void onWait(WorkflowExecutionContext context) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java index c4f8843..5c75f5c 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java @@ -21,19 +21,24 @@ package org.apache.falcon.workflow; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.FalconService; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.util.StartupProperties; -import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * A workflow job end notification service. @@ -46,11 +51,21 @@ public class WorkflowJobEndNotificationService implements FalconService { private Set listeners = new LinkedHashSet(); + // Maintain a cache of context built, so we don't have to query Oozie for every state change. + private Map contextMap = new ConcurrentHashMap<>(); + + private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get(); + @Override public String getName() { return SERVICE_NAME; } + // Mainly for test + Map getContextMap() { + return contextMap; + } + @Override public void init() throws FalconException { String listenerClassNames = StartupProperties.get().getProperty( @@ -83,9 +98,34 @@ public class WorkflowJobEndNotificationService implements FalconService { } public void notifyFailure(WorkflowExecutionContext context) { + notifyWorkflowEnd(context); + } + + public void notifySuccess(WorkflowExecutionContext context) { + notifyWorkflowEnd(context); + } + + public void notifyStart(WorkflowExecutionContext context) { + // Start notifications can only be from Oozie JMS notifications + updateContextFromWFConf(context); + LOG.debug("Sending workflow start notification to listeners with context : {} ", context); + for (WorkflowExecutionListener listener : listeners) { + try { + listener.onStart(context); + } catch (Throwable t) { + // do not rethrow as other listeners do not get a chance + LOG.error("Error in listener {}", listener.getClass().getName(), t); + } + } + } + + public void notifySuspend(WorkflowExecutionContext context) { + // Suspend notifications can only be from Oozie JMS notifications + updateContextFromWFConf(context); + LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context); for (WorkflowExecutionListener listener : listeners) { try { - listener.onFailure(context); + listener.onSuspend(context); } catch (Throwable t) { // do not rethrow as other listeners do not get a chance LOG.error("Error in listener {}", listener.getClass().getName(), t); @@ -93,19 +133,142 @@ public class WorkflowJobEndNotificationService implements FalconService { } instrumentAlert(context); + contextMap.remove(context.getWorkflowId()); } - public void notifySuccess(WorkflowExecutionContext context) { + public void notifyWait(WorkflowExecutionContext context) { + // Wait notifications can only be from Oozie JMS notifications + updateContextFromWFConf(context); + LOG.debug("Sending workflow wait notification to listeners with context : {} ", context); for (WorkflowExecutionListener listener : listeners) { try { - listener.onSuccess(context); + listener.onWait(context); } catch (Throwable t) { // do not rethrow as other listeners do not get a chance LOG.error("Error in listener {}", listener.getClass().getName(), t); } } + } - instrumentAlert(context); + // The method retrieves the conf from the cache if it is in cache. + // Else, queries WF Engine to retrieve the conf of the workflow + private void updateContextFromWFConf(WorkflowExecutionContext context) { + try { + Properties wfProps = contextMap.get(context.getWorkflowId()); + if (wfProps == null) { + Entity entity = CONFIG_STORE.get(EntityType.valueOf(context.getEntityType()), context.getEntityName()); + // Entity can be null in case of delete. Engine will generate notifications for instance kills. + // But, the entity would no longer be in the config store. + if (entity == null) { + return; + } + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + try { + InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine() + .getJobDetails(cluster, context.getWorkflowId()).getInstances(); + if (instances != null && instances.length > 0) { + wfProps = getWFProps(instances[0].getWfParams()); + // Required by RetryService. But, is not part of conf. + wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(), + Integer.toString(instances[0].getRunId())); + } + } catch (FalconException e) { + // Do Nothing. The workflow may not have been deployed on this cluster. + continue; + } + contextMap.put(context.getWorkflowId(), wfProps); + } + } + + // No extra props to enhance the context with. + if (wfProps == null || wfProps.isEmpty()) { + return; + } + + for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) { + if (wfProps.containsKey(arg.getName())) { + context.setValue(arg, wfProps.getProperty(arg.getName())); + } + } + + } catch (FalconException e) { + LOG.error("Unable to retrieve entity {} of type {} from config store.", e); + } + } + + private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) { + Properties props = new Properties(); + for (InstancesResult.KeyValuePair kv : wfParams) { + props.put(kv.getKey(), kv.getValue()); + } + return props; + } + + + // This method handles both success and failure notifications. + private void notifyWorkflowEnd(WorkflowExecutionContext context) { + // Need to distinguish notification from post processing for backward compatibility + if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) { + boolean engineNotifEnabled = false; + try { + engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine() + .isNotificationEnabled(context.getClusterName(), context.getWorkflowId()); + } catch (FalconException e) { + LOG.debug("Unable to determine if the notification is enabled on the wf engine. Assuming not.", e); + } + // Ignore the message from post processing as there will be one more from Oozie. + if (engineNotifEnabled) { + LOG.info("Ignoring message from post processing as engine notification is enabled."); + return; + } else { + updateContextWithTime(context); + } + } else { + updateContextFromWFConf(context); + } + + LOG.debug("Sending workflow end notification to listeners with context : {} ", context); + + for (WorkflowExecutionListener listener : listeners) { + try { + if (context.hasWorkflowSucceeded()) { + listener.onSuccess(context); + instrumentAlert(context); + } else { + listener.onFailure(context); + if (context.hasWorkflowBeenKilled() || context.hasWorkflowFailed()) { + instrumentAlert(context); + } + } + } catch (Throwable t) { + // do not rethrow as other listeners do not get a chance + LOG.error("Error in listener {}", listener.getClass().getName(), t); + } + } + + contextMap.remove(context.getWorkflowId()); + } + + // In case of notifications coming from post notifications, start and end time need to be populated. + private void updateContextWithTime(WorkflowExecutionContext context) { + try { + InstancesResult result = WorkflowEngineFactory.getWorkflowEngine() + .getJobDetails(context.getClusterName(), context.getWorkflowId()); + Date startTime = result.getInstances()[0].startTime; + Date endTime = result.getInstances()[0].endTime; + Date now = new Date(); + if (startTime == null) { + startTime = now; + } + if (endTime == null) { + endTime = now; + } + context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime())); + context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime())); + } catch(FalconException e) { + LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster " + + context.getClusterName(), e); + } } private void instrumentAlert(WorkflowExecutionContext context) { @@ -117,27 +280,31 @@ public class WorkflowJobEndNotificationService implements FalconService { String workflowUser = context.getWorkflowUser(); String nominalTime = context.getNominalTimeAsISO8601(); String runId = String.valueOf(context.getWorkflowRunId()); + Date now = new Date(); + // Start and/or End time may not be set in case of workflow suspend + Date endTime; + if (context.getWorkflowEndTime() == 0) { + endTime = now; + } else { + endTime = new Date(context.getWorkflowEndTime()); + } - try { - CurrentUser.authenticate(context.getWorkflowUser()); - AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); - InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId); - Date startTime = result.getInstances()[0].startTime; - Date endTime = result.getInstances()[0].endTime; - Long duration = (endTime.getTime() - startTime.getTime()) * 1000000; + Date startTime; + if (context.getWorkflowStartTime() == 0) { + startTime = now; + } else { + startTime = new Date(context.getWorkflowStartTime()); + } + Long duration = (endTime.getTime() - startTime.getTime()) * 1000000; - if (context.hasWorkflowFailed()) { - GenericAlert.instrumentFailedInstance(clusterName, entityType, - entityName, nominalTime, workflowId, workflowUser, runId, operation, - SchemaHelper.formatDateUTC(startTime), "", "", duration); - } else { - GenericAlert.instrumentSucceededInstance(clusterName, entityType, - entityName, nominalTime, workflowId, workflowUser, runId, operation, - SchemaHelper.formatDateUTC(startTime), duration); - } - } catch (FalconException e) { - // Logging an error and ignoring since there are listeners for extensions - LOG.error("Instrumenting alert failed for: " + context, e); + if (context.hasWorkflowFailed()) { + GenericAlert.instrumentFailedInstance(clusterName, entityType, + entityName, nominalTime, workflowId, workflowUser, runId, operation, + SchemaHelper.formatDateUTC(startTime), "", "", duration); + } else { + GenericAlert.instrumentSucceededInstance(clusterName, entityType, + entityName, nominalTime, workflowId, workflowUser, runId, operation, + SchemaHelper.formatDateUTC(startTime), duration); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 0b560bb..8b3460a 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -105,4 +105,6 @@ public abstract class AbstractWorkflowEngine { public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end, List lifeCycles) throws FalconException; + + public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java b/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java index 6060731..5b1af78 100644 --- a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java +++ b/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java @@ -17,10 +17,13 @@ */ package org.apache.falcon.entity; +import org.apache.falcon.Pair; import org.apache.falcon.Tag; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.Arrays; @@ -88,4 +91,19 @@ public class TestWorkflowNameBuilder { "FALCON_PROCESS_DEFAULT_agg-logs"); } + + @Test(dataProvider = "workflowNames") + public void workflowNameTypeTest(String wfName, Pair nameType) { + Assert.assertEquals(WorkflowNameBuilder.WorkflowName.getEntityNameAndType(wfName), nameType); + } + + @DataProvider(name = "workflowNames") + public Object[][] getWorkflowNames() { + return new Object[][] { + {"FALCON_PROCESS_DEFAULT_agg-logs", new Pair<>("agg-logs", EntityType.PROCESS)}, + {"FALCON_FEED_REPLICATION_raw-logs", new Pair<>("raw-logs", EntityType.FEED)}, + {"FALCON_FEED_RETENTION_logs2", new Pair<>("logs2", EntityType.FEED)}, + {"FALCON_FEED_REPLICATION_logs_colo1", new Pair<>("logs", EntityType.FEED)}, + }; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java index 65a057d..4fdc1e9 100644 --- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java +++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java @@ -24,6 +24,8 @@ import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Date; + /** * A test for WorkflowExecutionContext. @@ -86,6 +88,7 @@ public class WorkflowExecutionContextTest { @Test public void testHasWorkflowSucceeded() throws Exception { Assert.assertTrue(context.hasWorkflowSucceeded()); + Assert.assertEquals(context.getWorkflowStatus(), WorkflowExecutionContext.Status.SUCCEEDED); } @Test @@ -240,6 +243,18 @@ public class WorkflowExecutionContextTest { } @Test + public void testWorkflowStartEnd() throws Exception { + Assert.assertEquals(context.getWorkflowEndTime() - context.getWorkflowStartTime(), 1000000); + } + + @Test + public void testSetAndGetValue() throws Exception { + context.setValue(WorkflowExecutionArgs.RUN_ID, "10"); + Assert.assertEquals(context.getValue(WorkflowExecutionArgs.RUN_ID), "10"); + context.setValue(WorkflowExecutionArgs.RUN_ID, "1"); + } + + @Test public void testSerializeDeserialize() throws Exception { String contextFile = context.getContextFile(); context.serialize(); @@ -318,6 +333,8 @@ public class WorkflowExecutionContextTest { "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR, "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt", + "-" + WorkflowExecutionArgs.WF_START_TIME.getName(), Long.toString(new Date().getTime()), + "-" + WorkflowExecutionArgs.WF_END_TIME.getName(), Long.toString(new Date().getTime() + 1000000), }; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java index b7df443..1a9597b 100644 --- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java +++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java @@ -22,10 +22,13 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.process.EngineType; import org.apache.falcon.util.StartupProperties; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.Date; +import java.util.Properties; + /** * A test for WorkflowJobEndNotificationService. */ @@ -55,15 +58,17 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL private WorkflowJobEndNotificationService service; private WorkflowExecutionContext savedContext; - @BeforeMethod + @BeforeClass public void setUp() throws Exception { service = new WorkflowJobEndNotificationService(); savedContext = WorkflowExecutionContext.create(getTestMessageArgs(), WorkflowExecutionContext.Type.POST_PROCESSING); Assert.assertNotNull(savedContext); + service.init(); + service.registerListener(this); } - @AfterMethod + @AfterClass public void tearDown() throws Exception { service.destroy(); } @@ -73,29 +78,30 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL Assert.assertEquals(service.getName(), WorkflowJobEndNotificationService.SERVICE_NAME); } - @Test - public void testInit() throws Exception { - String listenerClassNames = StartupProperties.get().getProperty( - "workflow.execution.listeners"); - Assert.assertEquals(listenerClassNames, ""); - + @Test(priority = -1) + public void testBasic() throws Exception { try { - StartupProperties.get().setProperty("workflow.execution.listeners", - "org.apache.falcon.workflow.WorkflowJobEndNotificationServiceTest"); - listenerClassNames = StartupProperties.get().getProperty( - "workflow.execution.listeners"); - Assert.assertEquals(listenerClassNames, - "org.apache.falcon.workflow.WorkflowJobEndNotificationServiceTest"); - - service.init(); notifyFailure(savedContext); notifySuccess(savedContext); } finally { - service.unregisterListener(this); StartupProperties.get().setProperty("workflow.execution.listeners", ""); } } + @Test + public void testNotificationsFromEngine() throws FalconException { + WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(), + WorkflowExecutionContext.Type.WORKFLOW_JOB); + + // Pretend the start was already notified + Properties wfProps = new Properties(); + wfProps.put(WorkflowExecutionArgs.CLUSTER_NAME.name(), CLUSTER_NAME); + service.getContextMap().put("workflow-01-00", wfProps); + + // Should retrieve from cache. + service.notifySuspend(context); + } + @Override public void onSuccess(WorkflowExecutionContext context) throws FalconException { Assert.assertNotNull(context); @@ -108,6 +114,19 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL Assert.assertEquals(context.entrySet().size(), 28); } + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException { + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException { + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException { + + } + private void notifyFailure(WorkflowExecutionContext context) { service.notifyFailure(context); } @@ -150,6 +169,8 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR, "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt", + "-" + WorkflowExecutionArgs.WF_START_TIME.getName(), Long.toString(new Date().getTime()), + "-" + WorkflowExecutionArgs.WF_END_TIME.getName(), Long.toString(new Date().getTime() + 1000000), }; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/pom.xml ---------------------------------------------------------------------- diff --git a/messaging/pom.xml b/messaging/pom.xml index 918de63..e64a3f8 100644 --- a/messaging/pom.xml +++ b/messaging/pom.xml @@ -176,5 +176,12 @@ org.testng testng + + + org.mockito + mockito-all + 1.9.5 + provided + http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index d3178fb..bbb5d9b 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -20,12 +20,17 @@ package org.apache.falcon.messaging; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.WorkflowNameBuilder; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.messaging.util.MessagingUtil; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.WorkflowJobEndNotificationService; +import org.json.JSONException; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,12 +42,17 @@ import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import java.lang.reflect.InvocationTargetException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; /** * Subscribes to the falcon topic for handling retries and alerts. @@ -94,22 +104,22 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { @Override public void onMessage(Message message) { - MapMessage mapMessage = (MapMessage) message; LOG.info("Received JMS message {}", message.toString()); - try { - WorkflowExecutionContext context = createContext(mapMessage); - LOG.info("Created context from JMS message {}", context); - - // Login the user so listeners can access FS and WfEngine as this user - CurrentUser.authenticate(context.getWorkflowUser()); - - if (context.hasWorkflowFailed()) { - onFailure(context); - } else if (context.hasWorkflowSucceeded()) { - onSuccess(context); + if (message instanceof MapMessage) { + MapMessage mapMessage = (MapMessage) message; + WorkflowExecutionContext context = createContext(mapMessage); + LOG.info("Created context from Falcon JMS message {}", context); + invokeListener(context); + // Due to backward compatibility, need to handle messages from post processing too. + // Hence cannot use JMS selectors. + } else if (shouldHandle(message)) { + TextMessage textMessage = (TextMessage) message; + WorkflowExecutionContext context = createContext(textMessage); + LOG.info("Created context from Oozie JMS message {}", context); + invokeListener(context); } - } catch (JMSException e) { + } catch (Exception e) { String errorMessage = "Error in onMessage for topicSubscriber of topic: " + topicName + ", Message: " + message.toString(); LOG.info(errorMessage, e); @@ -117,6 +127,117 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { } } + // Creates context from the JMS notification of the workflow engine + private WorkflowExecutionContext createContext(TextMessage message) throws JMSException, FalconException { + try { + // Example Workflow Job in FAILED state: + // {"status":"FAILED","errorCode":"EL_ERROR","errorMessage":"variable [dummyvalue] cannot be resolved", + // "id":"0000042-130618221729631-oozie-oozi-W","startTime":1342915200000,"endTime":1366672183543} + JSONObject json = new JSONObject(message.getText()); + long currentTime = System.currentTimeMillis(); + Map wfProperties = new HashMap<>(); + wfProperties.put(WorkflowExecutionArgs.STATUS, json.getString("status")); + wfProperties.put(WorkflowExecutionArgs.WORKFLOW_ID, json.getString("id")); + wfProperties.put(WorkflowExecutionArgs.WF_START_TIME, json.isNull("startTime")? Long.toString(currentTime) + : json.getString("startTime")); + wfProperties.put(WorkflowExecutionArgs.WF_END_TIME, json.isNull("endTime")? Long.toString(currentTime) + : json.getString("endTime")); + if (!json.isNull("nominalTime")) { + wfProperties.put(WorkflowExecutionArgs.NOMINAL_TIME, + getNominalTimeString(Long.parseLong(json.getString("nominalTime")))); + } + Pair entityTypePair = WorkflowNameBuilder.WorkflowName.getEntityNameAndType( + message.getStringProperty("appName")); + wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first); + wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name()); + wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user")); + + String appType = message.getStringProperty("appType"); + return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType)); + + } catch (JSONException e) { + throw new FalconException("Unable to build a context from the JMS message.", e); + } + } + + private String getNominalTimeString(long timeInMillis) { + Date time = new Date(timeInMillis); + final String format = "yyyy-MM-dd-HH-mm"; + DateFormat dateFormat = new SimpleDateFormat(format); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + return dateFormat.format(time); + } + + private void invokeListener(WorkflowExecutionContext context) { + // Login the user so listeners can access FS and WfEngine as this user + CurrentUser.authenticate(context.getWorkflowUser()); + + WorkflowExecutionContext.Status status = WorkflowExecutionContext.Status.valueOf( + context.getValue(WorkflowExecutionArgs.STATUS)); + + // Handle only timeout and wait notifications of coord + if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) { + switch(status) { + case TIMEDOUT: + jobEndNotificationService.notifyFailure(context); + break; + case WAITING: + jobEndNotificationService.notifyWait(context); + break; + default: + break; + } + } else { + switch(status) { + case KILLED: + case FAILED: + jobEndNotificationService.notifyFailure(context); + break; + case SUCCEEDED: + jobEndNotificationService.notifySuccess(context); + break; + case SUSPENDED: + jobEndNotificationService.notifySuspend(context); + break; + case RUNNING: + jobEndNotificationService.notifyStart(context); + break; + default : + throw new IllegalArgumentException("Not valid Status of workflow"); + } + } + } + + // Since Oozie has a system level JMS connection info, Falcon should ensure it is handling notifications + // of Falcon entities only. + private boolean shouldHandle(Message message) { + try { + String appType = message.getStringProperty("appType"); + // Handle all workflow job notifications for falcon workflows + if (appType != null + && WorkflowExecutionContext.Type.WORKFLOW_JOB == WorkflowExecutionContext.Type.valueOf(appType) + && WorkflowNameBuilder.WorkflowName.getEntityNameAndType( + message.getStringProperty("appName")) != null) { + return true; + } + + // Handle coord notification for falcon workflows only for WAITING and TIMED_OUT. + if (appType != null + && WorkflowExecutionContext.Type.COORDINATOR_ACTION + == WorkflowExecutionContext.Type.valueOf(appType) + && WorkflowNameBuilder.WorkflowName.getEntityNameAndType( + message.getStringProperty("appName")) != null) { + String status = message.getStringProperty("eventStatus"); + if (status != null && ("WAITING".equals(status) || "FAILURE".equals(status))) { + return true; + } + } + } catch (JMSException e) { + LOG.error("Error while parsing the message header", e); + } + return false; + } + private WorkflowExecutionContext createContext(MapMessage mapMessage) throws JMSException { // for backwards compatibility, read all args from message Map wfProperties = new HashMap(); @@ -130,14 +251,6 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { return WorkflowExecutionContext.create(wfProperties); } - public void onFailure(WorkflowExecutionContext context) { - jobEndNotificationService.notifyFailure(context); - } - - public void onSuccess(WorkflowExecutionContext context) { - jobEndNotificationService.notifySuccess(context); - } - @Override public void onException(JMSException ignore) { String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName; http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java index 7356ee3..dee7c47 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java @@ -26,10 +26,11 @@ import org.apache.falcon.util.FalconTestUtil; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.WorkflowJobEndNotificationService; +import org.mockito.Mockito; import org.mortbay.log.Log; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import javax.jms.Connection; @@ -37,7 +38,9 @@ import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; +import javax.jms.Message; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.Topic; import java.io.IOException; import java.util.HashMap; @@ -54,17 +57,26 @@ public class JMSMessageConsumerTest { private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC"; private BrokerService broker; - @BeforeClass + private JMSMessageConsumer subscriber; + private WorkflowJobEndNotificationService jobEndService; + + @BeforeMethod public void setup() throws Exception { broker = new BrokerService(); broker.addConnector(BROKER_URL); broker.setDataDirectory("target/activemq"); broker.setBrokerName("localhost"); + jobEndService = Mockito.mock(WorkflowJobEndNotificationService.class); broker.start(); - broker.deleteAllMessages(); + //Comma separated topics are supported in startup properties + subscriber = new JMSMessageConsumer(BROKER_IMPL_CLASS, "", "", + BROKER_URL, TOPIC_NAME + "," + SECONDARY_TOPIC_NAME, jobEndService); + + subscriber.startSubscriber(); } - public void sendMessages(String topic) throws JMSException, FalconException, IOException { + public void sendMessages(String topic, WorkflowExecutionContext.Type type) + throws JMSException, FalconException, IOException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); Connection connection = connectionFactory.createConnection(); connection.start(); @@ -74,32 +86,100 @@ public class JMSMessageConsumerTest { javax.jms.MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < 3; i++) { - WorkflowExecutionContext context = WorkflowExecutionContext.create( - getMockFalconMessage(i), WorkflowExecutionContext.Type.POST_PROCESSING); + for (int i = 0; i < 5; i++) { + Message message = null; - MapMessage message = session.createMapMessage(); - for (Map.Entry entry : context.entrySet()) { - message.setString(entry.getKey().getName(), entry.getValue()); + switch(type) { + case POST_PROCESSING: + message = getMockFalconMessage(i, session); + break; + case WORKFLOW_JOB: + message = getMockOozieMessage(i, session); + break; + case COORDINATOR_ACTION: + message = getMockOozieCoordMessage(i, session); + default: + break; } - Log.debug("Sending:" + message); producer.send(message); } + } - WorkflowExecutionContext context = WorkflowExecutionContext.create( - getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING); - - MapMessage mapMessage = session.createMapMessage(); - for (Map.Entry entry : context.entrySet()) { - mapMessage.setString(entry.getKey().getName(), entry.getValue()); + private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException { + TextMessage message = session.createTextMessage(); + message.setStringProperty("appType", "WORKFLOW_JOB"); + message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); + message.setStringProperty("user", "falcon"); + switch(i % 4) { + case 0: + message.setText("{\"status\":\"RUNNING\",\"id\":\"0000042-130618221729631-oozie-oozi-W\"" + + ",\"startTime\":1342915200000}"); + break; + case 1: + message.setText("{\"status\":\"FAILED\",\"errorCode\":\"EL_ERROR\"," + + "\"errorMessage\":\"variable [dummyvalue] cannot be resolved\"," + + "\"id\":\"0000042-130618221729631-oozie-oozi-W\",\"startTime\":1342915200000," + + "\"endTime\":1366672183543}"); + break; + case 2: + message.setText("{\"status\":\"SUCCEEDED\",\"id\":\"0000039-130618221729631-oozie-oozi-W\"" + + ",\"startTime\":1342915200000," + + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C@1\",\"endTime\":1366676224154}"); + break; + case 3: + message.setText("{\"status\":\"SUSPENDED\",\"id\":\"0000039-130618221729631-oozie-oozi-W\"," + + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C@1\"}"); + break; + default: } + return message; + } - Log.debug("Sending:" + mapMessage); - producer.send(mapMessage); + private Message getMockOozieCoordMessage(int i, Session session) throws FalconException, JMSException { + TextMessage message = session.createTextMessage(); + message.setStringProperty("appType", "COORDINATOR_ACTION"); + message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); + message.setStringProperty("user", "falcon"); + switch(i % 5) { + case 0: + message.setText("{\"status\":\"WAITING\",\"nominalTime\":1310342400000,\"missingDependency\"" + + ":\"hdfs://gsbl90107.blue.com:8020/user/john/dir1/file1\"," + + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000," + + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\"}"); + message.setStringProperty("eventStatus", "WAITING"); + break; + case 1: + message.setText("{\"status\":\"RUNNING\",\"nominalTime\":1310342400000," + + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\"," + + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C\"}"); + message.setStringProperty("eventStatus", "STARTED"); + break; + case 2: + message.setText("{\"status\":\"SUCCEEDED\",\"nominalTime\":1310342400000," + + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\"," + + "\"startTime\":1342915200000,\"parentId\":\"0000025-130618221729631-oozie-oozi-C\"," + + "\"endTime\":1366677082799}"); + message.setStringProperty("eventStatus", "SUCCESS"); + break; + case 3: + message.setText("{\"status\":\"FAILED\",\"errorCode\":\"E0101\",\"errorMessage\":" + + "\"dummyError\",\"nominalTime\":1310342400000," + + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000," + + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\",\"endTime\":1366677140818}"); + message.setStringProperty("eventStatus", "FAILURE"); + break; + case 4: + message.setText("{\"status\":\"TIMEDOUT\",\"nominalTime\":1310342400000," + + "\"id\":\"0000025-130618221729631-oozie-oozi-C@1\",\"startTime\":1342915200000," + + "\"parentId\":\"0000025-130618221729631-oozie-oozi-C\",\"endTime\":1366677140818}"); + message.setStringProperty("eventStatus", "FAILURE"); + default: + } + return message; } - private String[] getMockFalconMessage(int i) { + private Message getMockFalconMessage(int i, Session session) throws FalconException, JMSException { Map message = new HashMap(); message.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS); message.put(WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL); @@ -127,39 +207,78 @@ public class JMSMessageConsumerTest { args[index++] = entry.getValue(); } - return args; + WorkflowExecutionContext context = WorkflowExecutionContext.create( + args, WorkflowExecutionContext.Type.POST_PROCESSING); + + MapMessage jmsMessage = session.createMapMessage(); + for (Map.Entry entry : context.entrySet()) { + jmsMessage.setString(entry.getKey().getName(), entry.getValue()); + } + + return jmsMessage; } @Test public void testSubscriber() { try { - //Comma separated topics are supported in startup properties - JMSMessageConsumer subscriber = new JMSMessageConsumer(BROKER_IMPL_CLASS, "", "", - BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService()); - subscriber.startSubscriber(); - sendMessages(TOPIC_NAME); + sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING); final BrokerView adminView = broker.getAdminView(); Assert.assertEquals(adminView.getTotalDequeueCount(), 0); - Assert.assertEquals(adminView.getTotalEnqueueCount(), 11); + Assert.assertEquals(adminView.getTotalEnqueueCount(), 10); Assert.assertEquals(adminView.getTotalConsumerCount(), 2); - sendMessages(SECONDARY_TOPIC_NAME); + sendMessages(SECONDARY_TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING); Assert.assertEquals(adminView.getTotalEnqueueCount(), 18); Assert.assertEquals(adminView.getTotalDequeueCount(), 0); Assert.assertEquals(adminView.getTotalConsumerCount(), 3); - - subscriber.closeSubscriber(); } catch (Exception e) { Assert.fail("This should not have thrown an exception.", e); } } - @AfterClass - public void tearDown() throws Exception { - broker.deleteAllMessages(); + @Test + public void testJMSMessagesFromOozie() throws Exception { + sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB); + + final BrokerView adminView = broker.getAdminView(); + + Assert.assertEquals(adminView.getTotalDequeueCount(), 0); + Assert.assertEquals(adminView.getTotalEnqueueCount(), 10); + Assert.assertEquals(adminView.getTotalConsumerCount(), 2); + + // Async operations. Give some time for messages to be processed. + Thread.sleep(100); + Mockito.verify(jobEndService, Mockito.times(2)).notifyStart(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService).notifyFailure(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService).notifySuccess(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService).notifySuspend(Mockito.any(WorkflowExecutionContext.class)); + } + + @Test + public void testJMSMessagesForOozieCoord() throws Exception { + sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.COORDINATOR_ACTION); + + final BrokerView adminView = broker.getAdminView(); + + Assert.assertEquals(adminView.getTotalDequeueCount(), 0); + Assert.assertEquals(adminView.getTotalEnqueueCount(), 12); + Assert.assertEquals(adminView.getTotalConsumerCount(), 2); + + // Async operations. Give some time for messages to be processed. + Thread.sleep(100); + Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.times(1)).notifyWait(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.times(1)).notifyFailure(Mockito.any(WorkflowExecutionContext.class)); + } + + @AfterMethod + public void tearDown() throws Exception{ broker.stop(); + subscriber.closeSubscriber(); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie-el-extensions/src/main/conf/oozie-site.xml ---------------------------------------------------------------------- diff --git a/oozie-el-extensions/src/main/conf/oozie-site.xml b/oozie-el-extensions/src/main/conf/oozie-site.xml index 0925b41..5ef7f2a 100644 --- a/oozie-el-extensions/src/main/conf/oozie-site.xml +++ b/oozie-el-extensions/src/main/conf/oozie-site.xml @@ -1,4 +1,4 @@ - +G + + oozie.services.ext + + org.apache.oozie.service.JMSAccessorService, + org.apache.oozie.service.JMSTopicService, + org.apache.oozie.service.EventHandlerService + + + + oozie.service.EventHandlerService.event.listeners + + org.apache.oozie.jms.JMSJobEventListener + + + + oozie.jms.producer.connection.properties + + java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616 + + + + oozie.service.JMSTopicService.topic.name + + WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC + + + Topic options are ${username} or a fixed string which can be specified as default or for a + particular job type. + For e.g To have a fixed string topic for workflows, coordinators and bundles, + specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2} + where job type can be WORKFLOW, COORDINATOR or BUNDLE. + Following example defines topic for workflow job, workflow action, coordinator job, coordinator action, + bundle job and bundle action + WORKFLOW=workflow, + COORDINATOR=coordinator, + BUNDLE=bundle + For jobs with no defined topic, default topic will be ${username} + + + + oozie.service.JMSTopicService.topic.prefix + FALCON. + + This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name. For eg: oozie. + + http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java index cff1187..4961896 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java @@ -62,9 +62,6 @@ public class FalconPostProcessing extends Configured implements Tool { LOG.info("Moving logs {}", context); invokeLogProducer(context); - LOG.info("Sending falcon message {}", context); - invokeFalconMessageProducer(context); - return 0; } @@ -75,13 +72,6 @@ public class FalconPostProcessing extends Configured implements Tool { jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS); } - private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception { - JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context) - .type(JMSMessageProducer.MessageType.FALCON) - .build(); - jmsMessageProducer.sendMessage(); - } - private void invokeLogProducer(WorkflowExecutionContext context) { // todo: need to move this out to Falcon in-process if (UserGroupInformation.isSecurityEnabled()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 6660af1..09c29ab 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -48,6 +48,7 @@ import org.apache.falcon.security.CurrentUser; import org.apache.falcon.update.UpdateHelper; import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,6 +56,7 @@ import org.apache.oozie.client.BundleJob; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.CoordinatorJob.Timeunit; +import org.apache.oozie.client.JMSConnectionInfo; import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; import org.apache.oozie.client.OozieClient; @@ -589,6 +591,25 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles); } + @Override + public boolean isNotificationEnabled(String cluster, String jobID) throws FalconException { + OozieClient client = OozieClientFactory.get(cluster); + try { + JMSConnectionInfo jmsConnection = client.getJMSConnectionInfo(); + if (jmsConnection != null && !jmsConnection.getJNDIProperties().isEmpty()){ + String falconTopic = StartupProperties.get().getProperty("entity.topic", "FALCON.ENTITY.TOPIC"); + String oozieTopic = client.getJMSTopicName(jobID); + if (falconTopic.equals(oozieTopic)) { + return true; + } + } + } catch (OozieClientException e) { + LOG.error("Error while retrieving JMS connection info", e); + } + + return false; + } + private static enum JobAction { KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS } @@ -1618,6 +1639,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { instance.endTime = jobInfo.getEndTime(); } instance.cluster = cluster; + instance.runId = jobInfo.getRun(); + instance.status = WorkflowStatus.valueOf(jobInfo.getStatus().name()); + instance.wfParams = getWFParams(jobInfo); instances[0] = instance; InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Instance for workflow id:" + jobId); http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java index 9d31d17..4b74368 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java @@ -179,9 +179,6 @@ public class FalconPostProcessingTest { verifyMesssage(consumer); } - // Verify falcon message - verifyMesssage(consumer); - connection.close(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java index 785dce8..64177a4 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java @@ -239,4 +239,19 @@ public class LateRerunHandler> extends public void onFailure(WorkflowExecutionContext context) throws FalconException { // do nothing since late data does not apply for failed workflows } + + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException { + // do nothing + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException { + // do nothing + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException { + // Do nothing. + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java index b952bbe..7aa094a 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java @@ -106,9 +106,28 @@ public class RetryHandler> extends @Override public void onFailure(WorkflowExecutionContext context) throws FalconException { + // Re-run does not make sense on timeouts. + if (context.hasWorkflowTimedOut()) { + return; + } handleRerun(context.getClusterName(), context.getEntityType(), context.getEntityName(), context.getNominalTimeAsISO8601(), context.getWorkflowRunIdString(), context.getWorkflowId(), context.getWorkflowUser(), context.getExecutionCompletionTime()); } + + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bd1e7e/webapp/src/conf/oozie/conf/oozie-site.xml ---------------------------------------------------------------------- diff --git a/webapp/src/conf/oozie/conf/oozie-site.xml b/webapp/src/conf/oozie/conf/oozie-site.xml index 8545ef9..466f79c 100644 --- a/webapp/src/conf/oozie/conf/oozie-site.xml +++ b/webapp/src/conf/oozie/conf/oozie-site.xml @@ -523,6 +523,54 @@ + + + oozie.services.ext + + org.apache.oozie.service.JMSAccessorService, + org.apache.oozie.service.JMSTopicService, + org.apache.oozie.service.EventHandlerService + + + + oozie.service.EventHandlerService.event.listeners + + org.apache.oozie.jms.JMSJobEventListener + + + + oozie.jms.producer.connection.properties + + java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616 + + + + oozie.service.JMSTopicService.topic.name + + WORKFLOW=ENTITY.TOPIC, COORDINATOR=ENTITY.TOPIC + + + Topic options are ${username} or a fixed string which can be specified as default or for a + particular job type. + For e.g To have a fixed string topic for workflows, coordinators and bundles, + specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2} + where job type can be WORKFLOW, COORDINATOR or BUNDLE. + Following example defines topic for workflow job, workflow action, coordinator job, coordinator action, + bundle job and bundle action + WORKFLOW=workflow, + COORDINATOR=coordinator, + BUNDLE=bundle + For jobs with no defined topic, default topic will be ${username} + + + + oozie.service.JMSTopicService.topic.prefix + FALCON. + + This can be used to append a prefix to the topic in oozie.service.JMSTopicService.topic.name. For eg: oozie. + + + oozie.service.HadoopAccessorService.supported.filesystems *