falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/8] git commit: FALCON-484 - Refactor workflow related code into context, listener and Notification. Contributed by Venkatesh Seetharam
Date Fri, 08 Aug 2014 20:31:34 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 522788157 -> 9921457c3


FALCON-484 - Refactor workflow related code into context, listener and Notification. Contributed
by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b42c53ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b42c53ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b42c53ba

Branch: refs/heads/master
Commit: b42c53baaf29fee4466264aac83b70dca0903af9
Parents: 5227881
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Fri Aug 8 12:55:48 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Fri Aug 8 12:55:48 2014 -0700

----------------------------------------------------------------------
 .../falcon/workflow/WorkflowExecutionArgs.java  | 122 +++++++
 .../workflow/WorkflowExecutionContext.java      | 326 +++++++++++++++++++
 .../workflow/WorkflowExecutionListener.java     |  31 ++
 .../WorkflowJobEndNotificationService.java      | 112 +++++++
 common/src/main/resources/startup.properties    |   1 +
 .../workflow/WorkflowExecutionContextTest.java  | 301 +++++++++++++++++
 src/conf/startup.properties                     |   1 +
 7 files changed, 894 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42c53ba/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
new file mode 100644
index 0000000..92af3e1
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+/**
+ * Arguments for workflow execution.
+ */
+public enum WorkflowExecutionArgs {
+
+    // instance details
+    NOMINAL_TIME("nominalTime", "instance time"),
+    ENTITY_TYPE("entityType", "type of the entity"),
+    ENTITY_NAME("entityName", "name of the entity"),
+    TIMESTAMP("timeStamp", "current timestamp"),
+
+    // where
+    CLUSTER_NAME("cluster", "name of the current cluster"),
+    OPERATION("operation", "operation like generate, delete, replicate"),
+
+    // who
+    WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),
+
+    // what
+    // workflow details
+    USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type", false),
+    USER_WORKFLOW_NAME("userWorkflowName", "user workflow name", false),
+    USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version", false),
+
+    // workflow execution details
+    WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
+    RUN_ID("runId", "current run-id of the instance"),
+    STATUS("status", "status of the user workflow isnstance"),
+    WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie", false),
+    USER_SUBFLOW_ID("subflowId", "external id of user workflow", false),
+
+    WF_START_TIME("workflowStartTime", "workflow start time", false),
+    WF_END_TIME("workflowEndTime", "workflow end time", false),
+    WF_DURATION("workflowDuration", "workflow duration", false),
+
+    // what inputs
+    INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs", false),
+    INPUT_FEED_PATHS("falconInputPaths", "comma separated input feed instance paths", false),
+
+    // what outputs
+    FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
+    FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
+
+    // broker related parameters
+    TOPIC_NAME("topicName", "name of the topic to be used to send JMS message", false),
+    BRKR_IMPL_CLASS("brokerImplClass", "falcon message broker Implementation class"),
+    BRKR_URL("brokerUrl", "falcon message broker url"),
+    USER_BRKR_IMPL_CLASS("userBrokerImplClass", "user broker Impl class", false),
+    USER_BRKR_URL("userBrokerUrl", "user broker url", false),
+    BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
+
+    // state maintained
+    LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
+    // execution context data recorded
+    LOG_DIR("logDir", "log dir where lineage can be recorded"),
+
+    CONTEXT_FILE("contextFile", "wf execution context file path where wf properties are recorded",
false),
+    CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false);
+
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    WorkflowExecutionArgs(String name, String description) {
+        this(name, description, true);
+    }
+
+    WorkflowExecutionArgs(String name, String description, boolean isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public Option getOption() {
+        return new Option(this.name, true, this.description);
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    public String getOptionValue(CommandLine cmd) {
+        return cmd.getOptionValue(this.name);
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42c53ba/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
new file mode 100644
index 0000000..60e6c58
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Captures the workflow execution context.
+ */
+public class WorkflowExecutionContext {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WorkflowExecutionContext.class);
+
+    public static final String PROCESS_INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
+
+    public static final String OUTPUT_FEED_SEPARATOR = ",";
+    public static final String INPUT_FEED_SEPARATOR = "#";
+
+
+    /**
+     * Workflow execution status.
+     */
+    public enum Status {SUCCEEDED, FAILED}
+
+    /**
+     * Workflow execution type.
+     */
+    public enum Type {PRE_PROCESSING, POST_PROCESSING}
+
+    /**
+     * Entity operations supported.
+     */
+    public enum EntityOperations {
+        GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
+    }
+
+    private final Map<WorkflowExecutionArgs, String> context;
+    private final long creationTime;
+
+    protected WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context)
{
+        this.context = context;
+        creationTime = System.currentTimeMillis();
+    }
+
+    public String getValue(WorkflowExecutionArgs arg) {
+        return context.get(arg);
+    }
+
+    public String getValue(WorkflowExecutionArgs arg, String defaultValue) {
+        return context.containsKey(arg) ? context.get(arg) : defaultValue;
+    }
+
+    public boolean containsKey(WorkflowExecutionArgs arg) {
+        return context.containsKey(arg);
+    }
+
+    public Set<Map.Entry<WorkflowExecutionArgs, String>> entrySet() {
+        return context.entrySet();
+    }
+
+    // helper methods
+    public boolean hasWorkflowSucceeded() {
+        return Status.SUCCEEDED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
+    }
+
+    public boolean hasWorkflowFailed() {
+        return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
+    }
+
+    public String getContextFile() {
+        return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
+    }
+
+    public String getLogDir() {
+        return getValue(WorkflowExecutionArgs.LOG_DIR);
+    }
+
+    public String getLogFile() {
+        return getValue(WorkflowExecutionArgs.LOG_FILE);
+    }
+
+    String getNominalTime() {
+        return getValue(WorkflowExecutionArgs.NOMINAL_TIME);
+    }
+
+    /**
+     * Returns nominal time as a ISO8601 formatted string.
+     * @return a ISO8601 formatted string
+     */
+    public String getNominalTimeAsISO8601() {
+        return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), PROCESS_INSTANCE_FORMAT);
+    }
+
+    public String getTimestamp() {
+        return getValue(WorkflowExecutionArgs.TIMESTAMP);
+    }
+
+    /**
+     * Returns timestamp as a ISO8601 formatted string.
+     * @return a ISO8601 formatted string
+     */
+    public String getTimeStampAsISO8601() {
+        return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), PROCESS_INSTANCE_FORMAT);
+    }
+
+    public String getClusterName() {
+        return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+    }
+
+    public String getEntityName() {
+        return getValue(WorkflowExecutionArgs.ENTITY_NAME);
+    }
+
+    public String getEntityType() {
+        return getValue(WorkflowExecutionArgs.ENTITY_TYPE);
+    }
+
+    public EntityOperations getOperation() {
+        return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION));
+    }
+
+    public String getOutputFeedNames() {
+        return getValue(WorkflowExecutionArgs.FEED_NAMES);
+    }
+
+    public String[] getOutputFeedNamesList() {
+        return getOutputFeedNames().split(OUTPUT_FEED_SEPARATOR);
+    }
+
+    public String getOutputFeedInstancePaths() {
+        return getValue(WorkflowExecutionArgs.FEED_INSTANCE_PATHS);
+    }
+
+    public String[] getOutputFeedInstancePathsList() {
+        return getOutputFeedInstancePaths().split(OUTPUT_FEED_SEPARATOR);
+    }
+
+    public String getInputFeedNames() {
+        return getValue(WorkflowExecutionArgs.INPUT_FEED_NAMES);
+    }
+
+    public String[] getInputFeedNamesList() {
+        return getInputFeedNames().split(INPUT_FEED_SEPARATOR);
+    }
+
+    public String getInputFeedInstancePaths() {
+        return getValue(WorkflowExecutionArgs.INPUT_FEED_PATHS);
+    }
+
+    public String[] getInputFeedInstancePathsList() {
+        return getInputFeedInstancePaths().split(INPUT_FEED_SEPARATOR);
+    }
+
+    public String getWorkflowEngineUrl() {
+        return getValue(WorkflowExecutionArgs.WF_ENGINE_URL);
+    }
+
+    public String getUserWorkflowEngine() {
+        return getValue(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE);
+    }
+
+    public String getUserWorkflowVersion() {
+        return getValue(WorkflowExecutionArgs.USER_WORKFLOW_VERSION);
+    }
+
+    public String getWorkflowId() {
+        return getValue(WorkflowExecutionArgs.WORKFLOW_ID);
+    }
+
+    public String getUserSubflowId() {
+        return getValue(WorkflowExecutionArgs.USER_SUBFLOW_ID);
+    }
+
+    public int getWorkflowRunId() {
+        return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID));
+    }
+
+    public String getWorkflowRunIdString() {
+        return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID)));
+    }
+
+    public String getWorkflowUser() {
+        return getValue(WorkflowExecutionArgs.WORKFLOW_USER);
+    }
+
+    public long getExecutionCompletionTime() {
+        return creationTime;
+    }
+
+    /**
+     * this method is invoked from with in the workflow.
+     *
+     * @throws java.io.IOException
+     * @throws org.apache.falcon.FalconException
+     */
+    public void serialize() throws IOException, FalconException {
+        serialize(getContextFile());
+    }
+
+    /**
+     * this method is invoked from with in the workflow.
+     *
+     * @param contextFile file to serialize the workflow execution metadata
+     * @throws org.apache.falcon.FalconException
+     */
+    public void serialize(String contextFile) throws FalconException {
+        LOG.info("Saving context to: [{}]", contextFile);
+        OutputStream out = null;
+        Path file = new Path(contextFile);
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri());
+            out = fs.create(file);
+
+            // making sure falcon can read this file
+            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+            fs.setPermission(file, permission);
+
+            out.write(JSONValue.toJSONString(context).getBytes());
+        } catch (IOException e) {
+            throw new FalconException("Error serializing context to: " + contextFile,  e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException ignore) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException
{
+        try {
+            Path lineageDataPath = new Path(contextFile); // file has 777 permissions
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(lineageDataPath.toUri());
+
+            BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath)));
+            return new WorkflowExecutionContext((Map<WorkflowExecutionArgs, String>)
JSONValue.parse(in));
+        } catch (IOException e) {
+            throw new FalconException("Error opening context file: " + contextFile, e);
+        }
+    }
+
+    public static String getFilePath(String logDir, String entityName) {
+        // LOG_DIR is sufficiently unique
+        return new Path(logDir, entityName + "-wf-post-exec-context.json").toString();
+    }
+
+    public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException
{
+        try {
+            Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs,
String>();
+            wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
+
+            CommandLine cmd = getCommand(args);
+            for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+                String optionValue = arg.getOptionValue(cmd);
+                if (StringUtils.isNotEmpty(optionValue)) {
+                    wfProperties.put(arg, optionValue);
+                }
+            }
+
+            wfProperties.put(WorkflowExecutionArgs.CONTEXT_FILE,
+                    getFilePath(wfProperties.get(WorkflowExecutionArgs.LOG_DIR),
+                            wfProperties.get(WorkflowExecutionArgs.ENTITY_NAME)));
+            return new WorkflowExecutionContext(wfProperties);
+        } catch (ParseException e) {
+            throw new FalconException("Error parsing wf args", e);
+        }
+    }
+
+    private static CommandLine getCommand(String[] arguments) throws ParseException {
+        Options options = new Options();
+
+        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+            addOption(options, arg, arg.isRequired());
+        }
+
+        return new GnuParser().parse(options, arguments, false);
+    }
+
+    private static void addOption(Options options, WorkflowExecutionArgs arg, boolean isRequired)
{
+        Option option = arg.getOption();
+        option.setRequired(isRequired);
+        options.addOption(option);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42c53ba/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
new file mode 100644
index 0000000..2d3a477
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * A listener interface for workflow execution.
+ */
+public interface WorkflowExecutionListener {
+
+    void onSuccess(WorkflowExecutionContext context) throws FalconException;
+
+    void onFailure(WorkflowExecutionContext context) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42c53ba/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
new file mode 100644
index 0000000..0602214
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A workflow job end notification service.
+ */
+public class WorkflowJobEndNotificationService implements FalconService {
+
+    public static final String NAME = WorkflowJobEndNotificationService.class.getSimpleName();
+
+    private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public void init() throws FalconException {
+        // do nothing
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        listeners.clear();
+    }
+
+    public void registerListener(WorkflowExecutionListener listener) {
+        listeners.add(listener);
+    }
+
+    public void unregisterListener(WorkflowExecutionListener listener) {
+        listeners.remove(listener);
+    }
+
+    public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
+        for (WorkflowExecutionListener listener : listeners) {
+            listener.onFailure(context);
+        }
+
+        instrumentAlert(context);
+    }
+
+    public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
+        for (WorkflowExecutionListener listener : listeners) {
+            listener.onSuccess(context);
+        }
+
+        instrumentAlert(context);
+    }
+
+    private void instrumentAlert(WorkflowExecutionContext context) throws FalconException
{
+        String clusterName = context.getClusterName();
+        String entityName = context.getEntityName();
+        String entityType = context.getEntityType();
+        String operation = context.getOperation().name();
+        String workflowId = context.getWorkflowId();
+        String workflowUser = context.getWorkflowUser();
+        String nominalTime = context.getNominalTimeAsISO8601();
+        String runId = String.valueOf(context.getWorkflowRunId());
+
+        CurrentUser.authenticate(context.getWorkflowUser());
+        AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+        InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
+        Date startTime = result.getInstances()[0].startTime;
+        Date endTime = result.getInstances()[0].endTime;
+        Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+
+        try {
+            if (context.hasWorkflowFailed()) {
+                GenericAlert.instrumentFailedInstance(clusterName, entityType,
+                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
+                        SchemaHelper.formatDateUTC(startTime), "", "", duration);
+            } else {
+                GenericAlert.instrumentSucceededInstance(clusterName, entityType,
+                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
+                        SchemaHelper.formatDateUTC(startTime), duration);
+            }
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42c53ba/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 0d49b4b..d215a62 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -29,6 +29,7 @@
 *.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
 
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
+                        org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42c53ba/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
new file mode 100644
index 0000000..93d8831
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow;
+
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/**
+ * A test for WorkflowExecutionContext.
+ */
+public class WorkflowExecutionContextTest {
+
+    private static final String FALCON_USER = "falcon-user";
+    private static final String LOGS_DIR = "target/log";
+    private static final String NOMINAL_TIME = "2014-01-01-01-00";
+    private static final String OPERATION = "GENERATE";
+
+    private static final String CLUSTER_NAME = "primary-cluster";
+    private static final String ENTITY_NAME = "sample-process";
+    private static final String WORKFLOW_NAME = "imp-click-join-workflow";
+    private static final String WORKFLOW_VERSION = "1.0.9";
+
+    private static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
+    private static final String INPUT_INSTANCE_PATHS =
+            "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
+                    + "#jail://global:00/falcon/clicks-feed/2014-01-01";
+
+    private static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
+    private static final String OUTPUT_INSTANCE_PATHS =
+            "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+
+    private static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
+
+    private static final String ISO8601_TIME = SchemaHelper.formatDateUTCToISO8601(
+            NOMINAL_TIME, WorkflowExecutionContext.PROCESS_INSTANCE_FORMAT);
+
+    private WorkflowExecutionContext context;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        context = WorkflowExecutionContext.create(getTestMessageArgs(),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+    }
+
+    @Test
+    public void testGetValue() throws Exception {
+        Assert.assertEquals(context.getValue(WorkflowExecutionArgs.ENTITY_NAME), ENTITY_NAME);
+    }
+
+    @Test
+    public void testGetValueWithDefault() throws Exception {
+        Assert.assertEquals(context.getValue(WorkflowExecutionArgs.TOPIC_NAME, "ABSENT"),
"ABSENT");
+    }
+
+    @Test
+    public void testContainsKey() throws Exception {
+        Assert.assertTrue(context.containsKey(WorkflowExecutionArgs.ENTITY_NAME));
+        Assert.assertFalse(context.containsKey(WorkflowExecutionArgs.TOPIC_NAME));
+    }
+
+    @Test
+    public void testEntrySet() throws Exception {
+        Assert.assertNotNull(context.entrySet());
+    }
+
+    @Test
+    public void testHasWorkflowSucceeded() throws Exception {
+        Assert.assertTrue(context.hasWorkflowSucceeded());
+    }
+
+    @Test
+    public void testHasWorkflowFailed() throws Exception {
+        Assert.assertFalse(context.hasWorkflowFailed());
+    }
+
+    @Test
+    public void testGetContextFile() throws Exception {
+        Assert.assertEquals(context.getContextFile(),
+                WorkflowExecutionContext.getFilePath(context.getLogDir(), context.getEntityName()));
+    }
+
+    @Test
+    public void testGetLogDir() throws Exception {
+        Assert.assertEquals(context.getLogDir(), LOGS_DIR);
+    }
+
+    @Test
+    public void testGetLogFile() throws Exception {
+        Assert.assertEquals(context.getLogFile(), LOGS_DIR + "/log.txt");
+    }
+
+    @Test
+    public void testGetNominalTime() throws Exception {
+        Assert.assertEquals(context.getNominalTime(), NOMINAL_TIME);
+    }
+
+    @Test
+    public void testGetNominalTimeAsISO8601() throws Exception {
+        Assert.assertEquals(context.getNominalTimeAsISO8601(), ISO8601_TIME);
+    }
+
+    @Test
+    public void testGetTimestamp() throws Exception {
+        Assert.assertEquals(context.getTimestamp(), NOMINAL_TIME);
+    }
+
+    @Test
+    public void testGetTimeStampAsISO8601() throws Exception {
+        Assert.assertEquals(context.getTimeStampAsISO8601(), ISO8601_TIME);
+    }
+
+    @Test
+    public void testGetClusterName() throws Exception {
+        Assert.assertEquals(context.getClusterName(), CLUSTER_NAME);
+    }
+
+    @Test
+    public void testGetEntityName() throws Exception {
+        Assert.assertEquals(context.getEntityName(), ENTITY_NAME);
+    }
+
+    @Test
+    public void testGetEntityType() throws Exception {
+        Assert.assertEquals(context.getEntityType(), "process");
+    }
+
+    @Test
+    public void testGetOperation() throws Exception {
+        Assert.assertEquals(context.getOperation().name(), OPERATION);
+    }
+
+    @Test
+    public void testGetOutputFeedNames() throws Exception {
+        Assert.assertEquals(context.getOutputFeedNames(), OUTPUT_FEED_NAMES);
+    }
+
+    @Test
+    public void testGetOutputFeedNamesList() throws Exception {
+        Assert.assertEquals(context.getOutputFeedNamesList(),
+                OUTPUT_FEED_NAMES.split(WorkflowExecutionContext.OUTPUT_FEED_SEPARATOR));
+    }
+
+    @Test
+    public void testGetOutputFeedInstancePaths() throws Exception {
+        Assert.assertEquals(context.getOutputFeedInstancePaths(), OUTPUT_INSTANCE_PATHS);
+    }
+
+    @Test
+    public void testGetOutputFeedInstancePathsList() throws Exception {
+        Assert.assertEquals(context.getOutputFeedInstancePathsList(),
+                OUTPUT_INSTANCE_PATHS.split(","));
+    }
+
+    @Test
+    public void testGetInputFeedNames() throws Exception {
+        Assert.assertEquals(context.getOutputFeedNames(), OUTPUT_FEED_NAMES);
+    }
+
+    @Test
+    public void testGetInputFeedNamesList() throws Exception {
+        Assert.assertEquals(context.getInputFeedNamesList(),
+                INPUT_FEED_NAMES.split(WorkflowExecutionContext.INPUT_FEED_SEPARATOR));
+    }
+
+    @Test
+    public void testGetInputFeedInstancePaths() throws Exception {
+        Assert.assertEquals(context.getInputFeedInstancePaths(), INPUT_INSTANCE_PATHS);
+    }
+
+    @Test
+    public void testGetInputFeedInstancePathsList() throws Exception {
+        Assert.assertEquals(context.getInputFeedInstancePathsList(),
+                INPUT_INSTANCE_PATHS.split("#"));
+    }
+
+    @Test
+    public void testGetWorkflowEngineUrl() throws Exception {
+        Assert.assertEquals(context.getWorkflowEngineUrl(), "http://localhost:11000/oozie");
+    }
+
+    @Test
+    public void testGetUserWorkflowEngine() throws Exception {
+        Assert.assertEquals(context.getUserWorkflowEngine(), EngineType.PIG.name());
+    }
+
+    @Test
+    public void testGetUserWorkflowVersion() throws Exception {
+        Assert.assertEquals(context.getUserWorkflowVersion(), WORKFLOW_VERSION);
+    }
+
+    @Test
+    public void testGetWorkflowId() throws Exception {
+        Assert.assertEquals(context.getWorkflowId(), "workflow-01-00");
+    }
+
+    @Test
+    public void testGetUserSubflowId() throws Exception {
+        Assert.assertEquals(context.getUserSubflowId(), "userflow@wf-id");
+    }
+
+    @Test
+    public void testGetWorkflowRunId() throws Exception {
+        Assert.assertEquals(context.getWorkflowRunId(), 1);
+    }
+
+    @Test
+    public void testGetWorkflowRunIdString() throws Exception {
+        Assert.assertEquals(context.getWorkflowRunIdString(), "1");
+    }
+
+    @Test
+    public void testGetWorkflowUser() throws Exception {
+        Assert.assertEquals(context.getWorkflowUser(), FALCON_USER);
+    }
+
+    @Test
+    public void testGetExecutionCompletionTime() throws Exception {
+        Assert.assertNotNull(context.getExecutionCompletionTime());
+    }
+
+    @Test
+    public void testSerializeDeserialize() throws Exception {
+        String contextFile = context.getContextFile();
+        context.serialize();
+        WorkflowExecutionContext newContext = WorkflowExecutionContext.deSerialize(contextFile);
+        Assert.assertNotNull(newContext);
+        Assert.assertEquals(newContext.entrySet().size(), context.entrySet().size());
+    }
+
+    @Test
+    public void testSerializeDeserializeWithFile() throws Exception {
+        String contextFile = "/tmp/blah.json";
+        context.serialize(contextFile);
+        WorkflowExecutionContext newContext = WorkflowExecutionContext.deSerialize(contextFile);
+        Assert.assertNotNull(newContext);
+        Assert.assertEquals(newContext.entrySet().size(), context.entrySet().size());
+    }
+
+    @Test
+    public void testGetFilePath() throws Exception {
+        Assert.assertEquals(WorkflowExecutionContext.getFilePath(LOGS_DIR, ENTITY_NAME),
+                LOGS_DIR + "/" + ENTITY_NAME + "-wf-post-exec-context.json");
+    }
+
+    private static String[] getTestMessageArgs() {
+        return new String[]{
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_NAME,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+
+            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+
+            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42c53ba/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 2c0bbbc..563c4b9 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -33,6 +33,7 @@
 *.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
 
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
+                        org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\


Mime
View raw message