falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [3/8] git commit: FALCON-485 - Simplify JMS Message Sender/Consumer and use Workflow Context. Contributed by Venkatesh Seetharam
Date Fri, 08 Aug 2014 20:31:36 GMT
FALCON-485 - Simplify JMS Message Sender/Consumer and use Workflow Context. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/23762e55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/23762e55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/23762e55

Branch: refs/heads/master
Commit: 23762e55b860fa1ecf2433de0cda654e2a3f0fc1
Parents: b42c53b
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Fri Aug 8 12:57:03 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Fri Aug 8 12:57:03 2014 -0700

----------------------------------------------------------------------
 .../org/apache/falcon/entity/FeedHelper.java    |   2 +-
 .../workflow/WorkflowExecutionContext.java      |  40 +-
 .../falcon/messaging/EntityInstanceMessage.java | 242 ------------
 .../messaging/EntityInstanceMessageCreator.java |  56 ---
 .../falcon/messaging/JMSMessageConsumer.java    | 181 +++++++++
 .../falcon/messaging/JMSMessageProducer.java    | 376 +++++++++++++++++++
 .../falcon/messaging/MessageProducer.java       | 180 ---------
 .../messaging/FalconTopicProducerTest.java      | 215 -----------
 .../falcon/messaging/FeedProducerTest.java      |  88 +++--
 .../messaging/JMSMessageConsumerTest.java       | 156 ++++++++
 .../messaging/JMSMessageProducerTest.java       | 218 +++++++++++
 .../falcon/messaging/ProcessProducerTest.java   | 100 +++--
 .../falcon/util/ResourcesReflectionUtil.java    |   2 +-
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  34 +-
 .../feed/FeedReplicationCoordinatorBuilder.java |  12 +-
 .../feed/FeedRetentionCoordinatorBuilder.java   |  11 +-
 .../ProcessExecutionCoordinatorBuilder.java     |  10 +-
 .../feed/OozieFeedWorkflowBuilderTest.java      |   2 +-
 .../OozieProcessWorkflowBuilderTest.java        |  14 +-
 .../src/test/resources/oozie/xmls/workflow.xml  |   2 +-
 .../falcon/service/FalconTopicSubscriber.java   | 200 ----------
 .../service/ProcessSubscriberService.java       |  13 +-
 .../service/FalconTopicSubscriberTest.java      | 124 ------
 23 files changed, 1131 insertions(+), 1147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 8c61ac2..d09a12f 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -284,7 +284,7 @@ public final class FeedHelper {
         try {
             version = BuildProperties.get().getProperty("build.version");
         } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
-            version = "0.5";
+            version = "0.6";
         }
         props.put("userWorkflowVersion", version);
         return props;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 60e6c58..637cc3e 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -73,6 +73,24 @@ public class WorkflowExecutionContext {
         GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
     }
 
+    public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = {
+        WorkflowExecutionArgs.CLUSTER_NAME,
+        WorkflowExecutionArgs.ENTITY_NAME,
+        WorkflowExecutionArgs.ENTITY_TYPE,
+        WorkflowExecutionArgs.NOMINAL_TIME,
+        WorkflowExecutionArgs.OPERATION,
+
+        WorkflowExecutionArgs.FEED_NAMES,
+        WorkflowExecutionArgs.FEED_INSTANCE_PATHS,
+
+        WorkflowExecutionArgs.WORKFLOW_ID,
+        WorkflowExecutionArgs.WORKFLOW_USER,
+        WorkflowExecutionArgs.RUN_ID,
+        WorkflowExecutionArgs.STATUS,
+        WorkflowExecutionArgs.TIMESTAMP,
+        WorkflowExecutionArgs.LOG_FILE,
+    };
+
     private final Map<WorkflowExecutionArgs, String> context;
     private final long creationTime;
 
@@ -287,10 +305,9 @@ public class WorkflowExecutionContext {
     }
 
     public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException {
-        try {
-            Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
-            wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
+        Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
 
+        try {
             CommandLine cmd = getCommand(args);
             for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
                 String optionValue = arg.getOptionValue(cmd);
@@ -298,14 +315,16 @@ public class WorkflowExecutionContext {
                     wfProperties.put(arg, optionValue);
                 }
             }
-
-            wfProperties.put(WorkflowExecutionArgs.CONTEXT_FILE,
-                    getFilePath(wfProperties.get(WorkflowExecutionArgs.LOG_DIR),
-                            wfProperties.get(WorkflowExecutionArgs.ENTITY_NAME)));
-            return new WorkflowExecutionContext(wfProperties);
         } catch (ParseException e) {
             throw new FalconException("Error parsing wf args", e);
         }
+
+        wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
+        wfProperties.put(WorkflowExecutionArgs.CONTEXT_FILE,
+                getFilePath(wfProperties.get(WorkflowExecutionArgs.LOG_DIR),
+                        wfProperties.get(WorkflowExecutionArgs.ENTITY_NAME)));
+
+        return new WorkflowExecutionContext(wfProperties);
     }
 
     private static CommandLine getCommand(String[] arguments) throws ParseException {
@@ -323,4 +342,9 @@ public class WorkflowExecutionContext {
         option.setRequired(isRequired);
         options.addOption(option);
     }
+
+    public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties) {
+        wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, Type.POST_PROCESSING.name());
+        return new WorkflowExecutionContext(wfProperties);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
deleted file mode 100644
index 679e9ea..0000000
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.messaging;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * Value Object which is stored in JMS Topic as MapMessage.
- */
-public class EntityInstanceMessage {
-
-    private final Map<ARG, String> keyValueMap = new LinkedHashMap<ARG, String>();
-    private static final Logger LOG = LoggerFactory.getLogger(EntityInstanceMessage.class);
-    private static final String FALCON_ENTITY_TOPIC_NAME = "FALCON.ENTITY.TOPIC";
-
-    /**
-     * Feed Entity operations supported.
-     */
-    public enum EntityOps {
-        GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
-    }
-
-    /**
-     * properties available in feed entity operation workflow.
-     */
-    public enum ARG {
-        entityName("entityName"),
-        feedNames("feedNames"),
-        feedInstancePaths("feedInstancePaths"),
-        workflowId("workflowId"),
-        runId("runId"),
-        nominalTime("nominalTime"),
-        timeStamp("timeStamp"),
-        brokerUrl("broker.url"),
-        brokerImplClass("broker.impl.class"),
-        entityType("entityType"),
-        operation("operation"),
-        logFile("logFile"),
-        topicName("topicName"),
-        status("status"),
-        brokerTTL("broker.ttlInMins"),
-        cluster("cluster"),
-        workflowUser("workflowUser"),
-        logDir("logDir");
-
-        private String propName;
-
-        private ARG(String propName) {
-            this.propName = propName;
-        }
-
-        /**
-         * @return Name of the Argument used in the parent workflow to pass
-         *         arguments to MessageProducer Main class.
-         */
-        public String getArgName() {
-            return this.name();
-        }
-
-        /**
-         * @return Name of the property used in the startup.properties,
-         *         coordinator and parent workflow.
-         */
-        public String getPropName() {
-            return this.propName;
-        }
-    }
-
-    public Map<ARG, String> getKeyValueMap() {
-        return this.keyValueMap;
-    }
-
-    public String getTopicName() {
-        return this.keyValueMap.get(ARG.topicName);
-    }
-
-    public String getFeedName() {
-        return this.keyValueMap.get(ARG.feedNames);
-    }
-
-    public void setFeedName(String feedName) {
-        this.keyValueMap.remove(ARG.feedNames);
-        this.keyValueMap.put(ARG.feedNames, feedName);
-    }
-
-    public String getFeedInstancePath() {
-        return this.keyValueMap.get(ARG.feedInstancePaths);
-    }
-
-    public void setFeedInstancePath(String feedInstancePath) {
-        this.keyValueMap.remove(ARG.feedInstancePaths);
-        this.keyValueMap.put(ARG.feedInstancePaths, feedInstancePath);
-    }
-
-    public String getEntityType() {
-        return this.keyValueMap.get(ARG.entityType);
-    }
-
-    public String getBrokerTTL() {
-        return this.keyValueMap.get(ARG.brokerTTL);
-    }
-
-    public void convertDateFormat() throws ParseException {
-        String date = this.keyValueMap.remove(ARG.nominalTime);
-        this.keyValueMap.put(ARG.nominalTime, getFalconDate(date));
-        date = this.keyValueMap.remove(ARG.timeStamp);
-        this.keyValueMap.put(ARG.timeStamp, getFalconDate(date));
-    }
-
-    public static EntityInstanceMessage[] getMessages(CommandLine cmd)
-        throws ParseException {
-
-        String[] feedNames = getFeedNames(cmd);
-        if (feedNames == null) {
-            return null;
-        }
-
-        String[] feedPaths;
-        try {
-            feedPaths = getFeedPaths(cmd);
-        } catch (IOException e) {
-            LOG.error("Error getting instance paths", e);
-            throw new RuntimeException(e);
-        }
-
-        EntityInstanceMessage[] messages = new EntityInstanceMessage[feedPaths.length];
-        for (int i = 0; i < feedPaths.length; i++) {
-            EntityInstanceMessage message = new EntityInstanceMessage();
-            setDefaultValues(cmd, message);
-            // override default values
-            if (message.getEntityType().equalsIgnoreCase("PROCESS")) {
-                message.setFeedName(feedNames[i]);
-            } else {
-                message.setFeedName(message.getFeedName());
-            }
-            message.setFeedInstancePath(feedPaths[i]);
-            message.convertDateFormat();
-            messages[i] = message;
-        }
-
-        return messages;
-    }
-
-    private static void setDefaultValues(CommandLine cmd,
-                                         EntityInstanceMessage message) {
-        for (ARG arg : ARG.values()) {
-            message.keyValueMap.put(arg, cmd.getOptionValue(arg.name()));
-        }
-    }
-
-    private static String[] getFeedNames(CommandLine cmd) {
-        String feedNameStr = cmd.getOptionValue(ARG.feedNames.getArgName());
-        String topicName = cmd.getOptionValue(ARG.topicName.getArgName());
-        if (topicName.equals(FALCON_ENTITY_TOPIC_NAME)) {
-            return new String[]{feedNameStr};
-        }
-        if (feedNameStr.equals("null")) {
-            return null;
-        }
-
-        return feedNameStr.split(",");
-    }
-
-    private static String[] getFeedPaths(CommandLine cmd) throws IOException {
-        String topicName = cmd.getOptionValue(ARG.topicName.getArgName());
-        String operation = cmd.getOptionValue(ARG.operation.getArgName());
-
-        if (topicName.equals(FALCON_ENTITY_TOPIC_NAME)) {
-            LOG.debug("Returning instance paths for Falcon Topic: {}",
-                    cmd.getOptionValue(ARG.feedInstancePaths.getArgName()));
-            return new String[]{cmd.getOptionValue(ARG.feedInstancePaths.getArgName()), };
-        }
-
-        if (operation.equals(EntityOps.GENERATE.name()) || operation.equals(EntityOps.REPLICATE.name())) {
-            LOG.debug("Returning instance paths: {}", cmd.getOptionValue(ARG.feedInstancePaths.getArgName()));
-            return cmd.getOptionValue(ARG.feedInstancePaths.getArgName()).split(",");
-        }
-        //else case of feed retention
-        Path logFile = new Path(cmd.getOptionValue(ARG.logFile.getArgName()));
-        FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
-
-        if (!fs.exists(logFile)) {
-            //Evictor Failed without deleting a single path
-            return new String[0];
-        }
-
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(logFile);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        String[] instancePaths = writer.toString().split("=");
-        fs.delete(logFile, true);
-        LOG.info("Deleted feed instance paths file: {}", logFile);
-        if (instancePaths.length == 1) {
-            LOG.debug("Returning 0 instance paths for feed ");
-            return new String[0];
-        } else {
-            LOG.debug("Returning instance paths for feed {}", instancePaths[1]);
-            return instancePaths[1].split(",");
-        }
-    }
-
-    public String getFalconDate(String nominalTime) throws ParseException {
-        DateFormat nominalFormat = new SimpleDateFormat(
-                "yyyy'-'MM'-'dd'-'HH'-'mm");
-        Date nominalDate = nominalFormat.parse(nominalTime);
-        DateFormat falconFormat = new SimpleDateFormat(
-                "yyyy'-'MM'-'dd'T'HH':'mm'Z'");
-        return falconFormat.format(nominalDate);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
deleted file mode 100644
index c8ea12d..0000000
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessageCreator.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.messaging;
-
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.Session;
-import java.util.Map.Entry;
-
-/**
- * Falcon JMS message creator- creates JMS TextMessage.
- */
-public class EntityInstanceMessageCreator {
-
-    private MapMessage mapMessage;
-
-    private final EntityInstanceMessage instanceMessage;
-
-    public EntityInstanceMessageCreator(EntityInstanceMessage instanceMessage) {
-        this.instanceMessage = instanceMessage;
-    }
-
-    public Message createMessage(Session session) throws JMSException {
-        mapMessage = session.createMapMessage();
-        for (Entry<ARG, String> entry : instanceMessage.getKeyValueMap().entrySet()) {
-            mapMessage.setString(entry.getKey().getArgName(),
-                    instanceMessage.getKeyValueMap().get(entry.getKey()));
-        }
-
-        return mapMessage;
-    }
-
-    @Override
-    public String toString() {
-        return this.mapMessage.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
new file mode 100644
index 0000000..573f5bd
--- /dev/null
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.messaging;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Subscribes to the falcon topic for handling retries and alerts.
+ */
+public class JMSMessageConsumer implements MessageListener, ExceptionListener {
+    private static final Logger LOG = Logger.getLogger(JMSMessageConsumer.class);
+
+    private final String implementation;
+    private final String userName;
+    private final String password;
+    private final String url;
+    private final String topicName;
+
+    private Connection connection;
+    private TopicSubscriber subscriber;
+
+    private final WorkflowJobEndNotificationService jobEndNotificationService;
+
+    public JMSMessageConsumer(String implementation, String userName,
+                              String password, String url, String topicName,
+                              WorkflowJobEndNotificationService jobEndNotificationService) {
+        this.implementation = implementation;
+        this.userName = userName;
+        this.password = password;
+        this.url = url;
+        this.topicName = topicName;
+        this.jobEndNotificationService = jobEndNotificationService;
+    }
+
+    public void startSubscriber() throws FalconException {
+        try {
+            connection = createAndGetConnection(implementation, userName, password, url);
+            TopicSession session = (TopicSession) connection.createSession(
+                    false, Session.AUTO_ACKNOWLEDGE);
+            Topic destination = session.createTopic(topicName);
+            subscriber = session.createSubscriber(destination);
+            subscriber.setMessageListener(this);
+            connection.setExceptionListener(this);
+            connection.start();
+        } catch (Exception e) {
+            LOG.error("Error starting subscriber of topic: " + this.toString(), e);
+            throw new FalconException(e);
+        }
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        MapMessage mapMessage = (MapMessage) message;
+
+        try {
+            if (LOG.isDebugEnabled()) {debug(mapMessage); }
+
+            WorkflowExecutionContext context = createContext(mapMessage);
+            if (context.hasWorkflowFailed()) {
+                onFailure(context);
+            } else if (context.hasWorkflowSucceeded()) {
+                onSuccess(context);
+            }
+        } catch (JMSException e) {
+            LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
+        } catch (FalconException e) {
+            LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
+        } catch (Exception e) {
+            LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
+        }
+    }
+
+    private WorkflowExecutionContext createContext(MapMessage mapMessage) throws JMSException {
+        // for backwards compatibility, read all args from message
+        Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
+        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+            String optionValue = mapMessage.getString(arg.getName());
+            if (StringUtils.isNotEmpty(optionValue)) {
+                wfProperties.put(arg, optionValue);
+            }
+        }
+
+        return WorkflowExecutionContext.create(wfProperties);
+    }
+
+    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+        jobEndNotificationService.notifyFailure(context);
+    }
+
+    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+        jobEndNotificationService.notifySuccess(context);
+    }
+
+    private void debug(MapMessage mapMessage) throws JMSException {
+        StringBuilder buff = new StringBuilder();
+        buff.append("Received:{");
+        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+            buff.append(arg.getName()).append('=')
+                .append(mapMessage.getString(arg.getName())).append(", ");
+        }
+        buff.append("}");
+        LOG.debug(buff);
+    }
+
+    @Override
+    public void onException(JMSException ignore) {
+        LOG.info("Error in onException for subscriber of topic: " + this.toString(), ignore);
+    }
+
+    public void closeSubscriber() throws FalconException {
+        try {
+            LOG.info("Closing subscriber on topic : " + this.topicName);
+            if (subscriber != null) {
+                subscriber.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (JMSException e) {
+            LOG.error("Error closing subscriber of topic: " + this.toString(), e);
+            throw new FalconException(e);
+        }
+    }
+
+    private static Connection createAndGetConnection(String implementation,
+                                                     String userName, String password, String url)
+        throws JMSException, ClassNotFoundException, InstantiationException,
+            IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+
+        @SuppressWarnings("unchecked")
+        Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
+                JMSMessageConsumer.class.getClassLoader().loadClass(implementation);
+
+        ConnectionFactory connectionFactory = clazz.getConstructor(
+                String.class, String.class, String.class).newInstance(userName,
+                password, url);
+
+        return connectionFactory.createConnection();
+    }
+
+    @Override
+    public String toString() {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
new file mode 100644
index 0000000..6c9859c
--- /dev/null
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.messaging;
+
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.Topic;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Message producer used in the workflow to send a message to the queue/topic.
+ */
+public class JMSMessageProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMSMessageProducer.class);
+
+    /**
+     * Message messageType.
+     */
+    public enum MessageType {FALCON, USER}
+
+    public static final String FALCON_TOPIC_PREFIX = "FALCON.";
+    public static final String ENTITY_TOPIC_NAME = "ENTITY.TOPIC";
+
+    private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
+
+    private static final WorkflowExecutionArgs[] FALCON_FILTER = {
+        WorkflowExecutionArgs.NOMINAL_TIME,
+        WorkflowExecutionArgs.ENTITY_NAME,
+        WorkflowExecutionArgs.OPERATION,
+        WorkflowExecutionArgs.LOG_DIR,
+        WorkflowExecutionArgs.STATUS,
+        WorkflowExecutionArgs.CONTEXT_FILE,
+        WorkflowExecutionArgs.TIMESTAMP,
+    };
+
+    private static final WorkflowExecutionArgs[] USER_FILTER = {
+        WorkflowExecutionArgs.CLUSTER_NAME,
+        WorkflowExecutionArgs.ENTITY_NAME,
+        WorkflowExecutionArgs.ENTITY_TYPE,
+        WorkflowExecutionArgs.NOMINAL_TIME,
+        WorkflowExecutionArgs.OPERATION,
+
+        WorkflowExecutionArgs.FEED_NAMES,
+        WorkflowExecutionArgs.FEED_INSTANCE_PATHS,
+
+        WorkflowExecutionArgs.WORKFLOW_ID,
+        WorkflowExecutionArgs.WORKFLOW_USER,
+        WorkflowExecutionArgs.RUN_ID,
+        WorkflowExecutionArgs.STATUS,
+        WorkflowExecutionArgs.TIMESTAMP,
+        WorkflowExecutionArgs.LOG_FILE,
+    };
+
+
+    private final WorkflowExecutionContext context;
+    private final MessageType messageType;
+
+    protected JMSMessageProducer(WorkflowExecutionContext context, MessageType messageType) {
+        this.context = context;
+        this.messageType = messageType;
+    }
+
+    public boolean isFalconEntityTopic() {
+        return messageType == MessageType.FALCON;
+    }
+
+    // convention over configuration
+    public String getTopicName() {
+        String topicNameValue = context.getValue(WorkflowExecutionArgs.TOPIC_NAME);
+        return topicNameValue != null
+                ? topicNameValue  // return if user has set a topic
+                : FALCON_TOPIC_PREFIX // else falcon entity topic or user = FALCON.$entity_name
+                        + (messageType == MessageType.FALCON ? ENTITY_TOPIC_NAME : context.getEntityName());
+    }
+
+    public String getBrokerImplClass() {
+        return messageType == MessageType.FALCON
+                ? context.getValue(WorkflowExecutionArgs.BRKR_IMPL_CLASS)
+                : context.getValue(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS);
+    }
+
+    public String getBrokerUrl() {
+        return messageType == MessageType.FALCON
+                ? context.getValue(WorkflowExecutionArgs.BRKR_URL)
+                : context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
+    }
+
+    private long getBrokerTTL() {
+        long messageTTL = DEFAULT_TTL;
+
+        try {
+            long messageTTLinMins = Long.parseLong(context.getValue(WorkflowExecutionArgs.BRKR_TTL));
+            messageTTL = messageTTLinMins * 60 * 1000;
+        } catch (NumberFormatException e) {
+            LOG.error("Error in parsing broker.ttl, setting TTL to: {} milli-seconds", DEFAULT_TTL);
+        }
+
+        return messageTTL;
+    }
+
+    public static MessageBuilder builder(WorkflowExecutionContext context) {
+        return new MessageBuilder(context);
+    }
+
+    /**
+     * Builder for JMSMessageProducer.
+     */
+    public static final class MessageBuilder {
+        private final WorkflowExecutionContext context;
+        private MessageType type;
+
+        private MessageBuilder(WorkflowExecutionContext context) {
+            this.context = context;
+        }
+
+        public MessageBuilder type(MessageType aMessageType) {
+            this.type = aMessageType;
+            return this;
+        }
+
+        public JMSMessageProducer build() {
+            if (type == null) {
+                throw new IllegalArgumentException("Message messageType needs to be set.");
+            }
+            return new JMSMessageProducer(context, type);
+        }
+    }
+
+    /**
+     * Accepts a Message to be send to JMS topic, creates a new
+     * Topic based on topic name if it does not exist or else
+     * existing topic with the same name is used to send the message.
+     */
+    public int sendMessage() throws JMSException {
+        List<Map<String, String>> messageList = buildMessageList();
+
+        if (messageList.isEmpty()) {
+            LOG.warn("No operation on output feed");
+            return 0;
+        }
+
+        Connection connection = null;
+        try {
+            connection = createAndStartConnection(getBrokerImplClass(), "", "", getBrokerUrl());
+
+            for (Map<String, String> message : messageList) {
+                LOG.info("Sending message: {}", message);
+                sendMessage(connection, message);
+            }
+        } catch (JMSException e) {
+            LOG.error("Error in getConnection:", e);
+        } catch (Exception e) {
+            LOG.error("Error in getConnection:", e);
+        } finally {
+            closeQuietly(connection);
+        }
+
+        return 0;
+    }
+
+    private List<Map<String, String>> buildMessageList() {
+        WorkflowExecutionArgs[] fileredArgs = messageType == MessageType.FALCON
+                ? FALCON_FILTER : USER_FILTER;
+
+        String[] feedNames = getFeedNames();
+        if (feedNames == null) {
+            return Collections.emptyList();
+        }
+
+        String[] feedPaths;
+        try {
+            feedPaths = getFeedPaths();
+        } catch (IOException e) {
+            LOG.error("Error getting instance paths: ", e);
+            throw new RuntimeException(e);
+        }
+
+        List<Map<String, String>> messages = new ArrayList<Map<String, String>>(feedPaths.length);
+        for (int i = 0; i < feedPaths.length; i++) {
+            Map<String, String> message = buildMessage(fileredArgs);
+
+            // override default values
+            if (context.getEntityType().equalsIgnoreCase("PROCESS")) {
+                change(message, WorkflowExecutionArgs.FEED_NAMES, feedNames[i]);
+            } else {
+                change(message, WorkflowExecutionArgs.FEED_NAMES,
+                        message.get(WorkflowExecutionArgs.FEED_NAMES.getName()));
+            }
+
+            change(message, WorkflowExecutionArgs.FEED_INSTANCE_PATHS, feedPaths[i]);
+            convertDateFormat(message);
+            messages.add(message);
+        }
+
+        return messages;
+    }
+
+    private void sendMessage(Connection connection,
+                             Map<String, String> message) throws JMSException {
+        Session session = null;
+        javax.jms.MessageProducer producer = null;
+        try {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic entityTopic = session.createTopic(getTopicName());
+
+            producer = session.createProducer(entityTopic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            producer.setTimeToLive(getBrokerTTL());
+
+            producer.send(createMessage(session, message));
+        } finally {
+            if (producer != null) {
+                producer.close();
+            }
+
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    public Message createMessage(Session session,
+                                 Map<String, String> message) throws JMSException {
+        MapMessage mapMessage = session.createMapMessage();
+
+        for (Map.Entry<String, String> entry : message.entrySet()) {
+            mapMessage.setString(entry.getKey(), entry.getValue());
+        }
+
+        return mapMessage;
+    }
+
+    public void change(Map<String, String> message, WorkflowExecutionArgs key, String value) {
+        message.remove(key.getName());
+        message.put(key.getName(), value);
+    }
+
+    private String[] getFeedNames() {
+        String feedNameStr = context.getOutputFeedNames();
+        if (isFalconEntityTopic()) {
+            return new String[]{feedNameStr};
+        }
+
+        if (feedNameStr.equals("null")) {
+            return null;
+        }
+
+        return context.getOutputFeedNamesList();
+    }
+
+    private String[] getFeedPaths() throws IOException {
+
+        if (isFalconEntityTopic()) {
+            LOG.debug("Returning instance paths for Falcon Topic: " + context.getOutputFeedInstancePaths());
+            return new String[]{context.getOutputFeedInstancePaths(), };
+        }
+
+        WorkflowExecutionContext.EntityOperations operation = context.getOperation();
+        if (operation == WorkflowExecutionContext.EntityOperations.GENERATE
+                || operation == WorkflowExecutionContext.EntityOperations.REPLICATE) {
+            LOG.debug("Returning instance paths: " + context.getOutputFeedInstancePaths());
+            return context.getOutputFeedInstancePathsList();
+        }
+
+        // else case of feed retention
+        Path logFile = new Path(context.getLogFile());
+        FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+
+        if (!fs.exists(logFile)) {
+            // Evictor Failed without deleting a single path
+            return new String[0];
+        }
+
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream instance = fs.open(logFile);
+        IOUtils.copyBytes(instance, writer, 4096, true);
+        String[] instancePaths = writer.toString().split("=");
+        fs.delete(logFile, true);
+        LOG.info("Deleted feed instance paths file:" + logFile);
+        if (instancePaths.length == 1) {
+            LOG.debug("Returning 0 instance paths for feed ");
+            return new String[0];
+        } else {
+            LOG.debug("Returning instance paths for feed " + instancePaths[1]);
+            return instancePaths[1].split(",");
+        }
+    }
+
+    private Map<String, String> buildMessage(final WorkflowExecutionArgs[] filter) {
+        Map<String, String> message = new HashMap<String, String>(filter.length);
+        for (WorkflowExecutionArgs arg : filter) {
+            message.put(arg.getName(), context.getValue(arg));
+        }
+
+        return message;
+    }
+
+    public void convertDateFormat(Map<String, String> message) {
+        String date = message.get(WorkflowExecutionArgs.NOMINAL_TIME.getName());
+        change(message, WorkflowExecutionArgs.NOMINAL_TIME,
+                SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm"));
+
+        date = message.get(WorkflowExecutionArgs.TIMESTAMP.getName());
+        change(message, WorkflowExecutionArgs.TIMESTAMP,
+                SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm"));
+    }
+
+    @SuppressWarnings("unchecked")
+    private Connection createAndStartConnection(String implementation, String userName,
+                                          String password, String url)
+        throws JMSException, ClassNotFoundException, InstantiationException,
+               IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+
+        Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
+                JMSMessageProducer.class.getClassLoader().loadClass(implementation);
+
+        ConnectionFactory connectionFactory = clazz
+                .getConstructor(String.class, String.class, String.class)
+                .newInstance(userName, password, url);
+
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        return connection;
+    }
+
+    private void closeQuietly(Connection connection) {
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (JMSException e) {
+            LOG.error("Error in closing connection:", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
deleted file mode 100644
index ccac921..0000000
--- a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.messaging;
-
-import org.apache.commons.cli.*;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-import java.lang.reflect.InvocationTargetException;
-
-/**
- * Message producer used in the workflow to send a message to the queue/topic.
- */
-public class MessageProducer extends Configured implements Tool {
-
-    private Connection connection;
-    private static final Logger LOG = LoggerFactory.getLogger(MessageProducer.class);
-    private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
-
-    /**
-     * @param entityInstanceMessage - Accepts a Message to be send to JMS topic, creates a new
-     *                  Topic based on topic name if it does not exist or else
-     *                  existing topic with the same name is used to send the message.
-     * @throws JMSException
-     */
-    protected void sendMessage(EntityInstanceMessage entityInstanceMessage)
-        throws JMSException {
-
-        Session session = connection.createSession(false,
-                Session.AUTO_ACKNOWLEDGE);
-        Topic entityTopic = session.createTopic(entityInstanceMessage
-                .getTopicName());
-        javax.jms.MessageProducer producer = session
-                .createProducer(entityTopic);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        long messageTTL = DEFAULT_TTL;
-        try {
-            long messageTTLinMins = Long.parseLong(entityInstanceMessage
-                    .getBrokerTTL());
-            messageTTL = messageTTLinMins * 60 * 1000;
-        } catch (NumberFormatException e) {
-            LOG.error("Error in parsing broker.ttl, setting TTL to: {} milli-seconds", DEFAULT_TTL);
-        }
-        producer.setTimeToLive(messageTTL);
-        producer.send(new EntityInstanceMessageCreator(entityInstanceMessage)
-                .createMessage(session));
-    }
-
-    public static void main(String[] args) throws Exception {
-        ToolRunner.run(new MessageProducer(), args);
-    }
-
-    private void createAndStartConnection(String implementation, String userName,
-                                          String password, String url)
-        throws JMSException, ClassNotFoundException, InstantiationException,
-               IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-
-        Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) MessageProducer.class
-                .getClassLoader().loadClass(implementation);
-
-        ConnectionFactory connectionFactory = clazz.getConstructor(
-                String.class, String.class, String.class).newInstance(userName,
-                password, url);
-
-        connection = connectionFactory.createConnection();
-        connection.start();
-    }
-
-    private static CommandLine getCommand(String[] arguments)
-        throws ParseException {
-
-        Options options = new Options();
-        addOption(options, new Option(ARG.brokerImplClass.getArgName(), true,
-                "message broker Implementation class"));
-        addOption(options, new Option(ARG.brokerTTL.getArgName(), true,
-                "message time-to-live"));
-        addOption(options, new Option(ARG.brokerUrl.getArgName(), true,
-                "message broker url"));
-        addOption(options, new Option(ARG.entityName.getArgName(), true,
-                "name of the entity"));
-        addOption(options, new Option(ARG.entityType.getArgName(), true,
-                "type of the entity"));
-        addOption(options, new Option(ARG.feedInstancePaths.getArgName(),
-                true, "feed instance paths"));
-        addOption(options, new Option(ARG.feedNames.getArgName(), true,
-                "feed names"));
-        addOption(options, new Option(ARG.logFile.getArgName(), true,
-                "log file path"));
-        addOption(options, new Option(ARG.nominalTime.getArgName(), true,
-                "instance time"));
-        addOption(options, new Option(ARG.operation.getArgName(), true,
-                "operation like generate, delete, archive"));
-        addOption(options, new Option(ARG.runId.getArgName(), true,
-                "current run-id of the instance"));
-        addOption(options, new Option(ARG.status.getArgName(), true,
-                "status of workflow instance"));
-        addOption(options, new Option(ARG.timeStamp.getArgName(), true,
-                "current timestamp"));
-        addOption(options, new Option(ARG.topicName.getArgName(), true,
-                "name of the topic to be used to send message"));
-        addOption(options, new Option(ARG.workflowId.getArgName(), true,
-                "workflow id"));
-        addOption(options, new Option(ARG.cluster.getArgName(), true,
-                "cluster name"));
-        addOption(options, new Option(ARG.workflowUser.getArgName(), true,
-                "workflow user id"), false);
-        addOption(options, new Option(ARG.logDir.getArgName(), true,
-                "log dir where job logs are copied"), false);
-
-        return new GnuParser().parse(options, arguments);
-    }
-
-    private static void addOption(Options options, Option opt) {
-        addOption(options, opt, true);
-    }
-
-    private static void addOption(Options options, Option opt, boolean isRequired) {
-        opt.setRequired(isRequired);
-        options.addOption(opt);
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        CommandLine cmd;
-        try {
-            cmd = getCommand(args);
-        } catch (ParseException e) {
-            throw new Exception("Unable to parse arguments: ", e);
-        }
-        EntityInstanceMessage[] entityInstanceMessage = EntityInstanceMessage
-                .getMessages(cmd);
-        if (entityInstanceMessage == null || entityInstanceMessage.length == 0) {
-            LOG.warn("No operation on output feed");
-            return 0;
-        }
-
-        try {
-            createAndStartConnection(cmd.getOptionValue(ARG.brokerImplClass.name()), "",
-                    "", cmd.getOptionValue(ARG.brokerUrl.name()));
-            for (EntityInstanceMessage message : entityInstanceMessage) {
-                LOG.info("Sending message: {}", message.getKeyValueMap());
-                sendMessage(message);
-            }
-        } catch (JMSException e) {
-            LOG.error("Error in getConnection", e);
-        } catch (Exception e) {
-            LOG.error("Error in getConnection", e);
-        } finally {
-            try {
-                if (connection != null) {
-                    connection.close();
-                }
-            } catch (JMSException e) {
-                LOG.error("Error in closing connection", e);
-            }
-        }
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
deleted file mode 100644
index 3f0c664..0000000
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.messaging;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.jms.*;
-
-/**
- * Test for falcon topic message producer.
- */
-public class FalconTopicProducerTest {
-
-    private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-    private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
-    private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
-    private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
-    private BrokerService broker;
-    private List<MapMessage> mapMessages;
-
-    private volatile AssertionError error;
-
-    @BeforeClass
-    public void setup() throws Exception {
-        broker = new BrokerService();
-        broker.addConnector(BROKER_URL);
-        broker.setDataDirectory("target/activemq");
-        broker.setBrokerName("localhost");
-        broker.start();
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        broker.deleteAllMessages();
-        broker.stop();
-    }
-
-    @Test
-    public void testWithFeedOutputPaths() throws Exception {
-        List<String> args = createCommonArgs();
-        List<String> newArgs = new ArrayList<String>(Arrays.asList(
-                "-" + ARG.entityName.getArgName(), "agg-coord",
-                "-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
-                "-" + ARG.feedInstancePaths.getArgName(),
-                "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
-                "-" + ARG.topicName.getArgName(), TOPIC_NAME));
-        args.addAll(newArgs);
-        List<String[]> messages = new ArrayList<String[]>();
-        messages.add(args.toArray(new String[args.size()]));
-        testProcessMessageCreator(messages, TOPIC_NAME);
-        for (MapMessage m : mapMessages) {
-            assertMessage(m);
-            Assert.assertTrue((m.getString(ARG.feedNames.getArgName())
-                    .equals("click-logs,raw-logs")));
-            Assert.assertTrue(m
-                    .getString(ARG.feedInstancePaths.getArgName())
-                    .equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
-        }
-    }
-
-    @Test
-    public void testWithEmptyFeedOutputPaths() throws Exception {
-        List<String> args = createCommonArgs();
-        List<String> newArgs = new ArrayList<String>(Arrays.asList(
-                "-" + ARG.entityName.getArgName(), "agg-coord",
-                "-" + ARG.feedNames.getArgName(), "null",
-                "-" + ARG.feedInstancePaths.getArgName(), "null",
-                "-" + ARG.topicName.getArgName(), TOPIC_NAME));
-        args.addAll(newArgs);
-        List<String[]> messages = new ArrayList<String[]>();
-        messages.add(args.toArray(new String[args.size()]));
-        testProcessMessageCreator(messages, TOPIC_NAME);
-        for (MapMessage m : mapMessages) {
-            assertMessage(m);
-            assertMessage(m);
-            Assert.assertTrue(m.getString(ARG.feedNames.getArgName()).equals(
-                    "null"));
-            Assert.assertTrue(m.getString(ARG.feedInstancePaths.getArgName())
-                    .equals("null"));
-        }
-    }
-
-    @Test
-    public void testConsumerWithMultipleTopics() throws Exception {
-        List<String[]> messages = new ArrayList<String[]>();
-        List<String> args = createCommonArgs();
-        List<String> newArgs = new ArrayList<String>(Arrays.asList(
-                "-" + ARG.entityName.getArgName(), "agg-coord",
-                "-" + ARG.feedNames.getArgName(), "raw-logs",
-                "-" + ARG.feedInstancePaths.getArgName(),
-                "/raw-logs/10/05/05/00/20",
-                "-" + ARG.topicName.getArgName(), TOPIC_NAME));
-        args.addAll(newArgs);
-        messages.add(args.toArray(new String[args.size()]));
-
-        args = createCommonArgs();
-        newArgs = new ArrayList<String>(Arrays.asList(
-                "-" + ARG.entityName.getArgName(), "agg-coord",
-                "-" + ARG.feedNames.getArgName(), "click-logs",
-                "-" + ARG.feedInstancePaths.getArgName(),
-                "/click-logs/10/05/05/00/20",
-                "-" + ARG.topicName.getArgName(), SECONDARY_TOPIC_NAME));
-        args.addAll(newArgs);
-        messages.add(args.toArray(new String[args.size()]));
-
-        testProcessMessageCreator(messages, TOPIC_NAME+","+SECONDARY_TOPIC_NAME);
-        Assert.assertEquals(mapMessages.size(), 2);
-        for (MapMessage m : mapMessages) {
-            assertMessage(m);
-        }
-    }
-
-    private List<String> createCommonArgs() {
-        return new ArrayList<String>(Arrays.asList(
-                "-" + ARG.workflowId.getArgName(), "workflow-01-00",
-                "-" + ARG.workflowUser.getArgName(), "falcon",
-                "-" + ARG.runId.getArgName(), "1",
-                "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-                "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-                "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-                "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-                "-" + ARG.entityType.getArgName(), ("process"),
-                "-" + ARG.operation.getArgName(), ("GENERATE"),
-                "-" + ARG.logFile.getArgName(), ("/logFile"),
-                "-" + ARG.status.getArgName(), ("SUCCEEDED"),
-                "-" + ARG.brokerTTL.getArgName(), "10",
-                "-" + ARG.cluster.getArgName(), "corp"));
-    }
-
-    private void testProcessMessageCreator(final List<String[]> messages,
-             final String topicsToListen) throws Exception {
-
-        Thread t = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    consumer(messages.size(), topicsToListen);
-                } catch (AssertionError e) {
-                    error = e;
-                } catch (Exception ignore) {
-                    error = null;
-                }
-            }
-        };
-        t.start();
-        Thread.sleep(100);
-        for (String[] message : messages) {
-            new MessageProducer().run(message);
-        }
-        t.join();
-        if (error != null) {
-            throw error;
-        }
-    }
-
-    private void consumer(int size, String topicsToListen) throws Exception {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                BROKER_URL);
-        Connection connection = connectionFactory.createConnection();
-        connection.start();
-
-        Session session = connection.createSession(false,
-                Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(topicsToListen);
-        MessageConsumer consumer = session.createConsumer(destination);
-        mapMessages = new ArrayList<MapMessage>();
-        for (int i=0; i<size; i++) {
-            MapMessage m = (MapMessage) consumer.receive();
-            mapMessages.add(m);
-            System.out.println("Consumed: " + m.toString());
-        }
-
-        connection.close();
-    }
-
-    private void assertMessage(MapMessage m) throws JMSException {
-        Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
-                "agg-coord");
-        Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
-                "workflow-01-00");
-        Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
-                "falcon");
-        Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
-        Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
-                "2011-01-01T01:00Z");
-        Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
-                "2012-01-01T01:00Z");
-        Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 57ccdc5..9119624 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -33,7 +33,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -68,24 +69,29 @@ public class FeedProducerTest {
         logFile = new Path(conf.get("fs.default.name"),
                 "/falcon/feed/agg-logs/instance-2012-01-01-10-00.csv");
 
-        args = new String[]{"-" + ARG.entityName.getArgName(), TOPIC_NAME,
-                            "-" + ARG.feedNames.getArgName(), "click-logs",
-                            "-" + ARG.feedInstancePaths.getArgName(),
-                            "/click-logs/10/05/05/00/20",
-                            "-" + ARG.workflowId.getArgName(), "workflow-01-00",
-                            "-" + ARG.workflowUser.getArgName(), "falcon",
-                            "-" + ARG.runId.getArgName(), "1",
-                            "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-                            "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-                            "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-                            "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-                            "-" + ARG.entityType.getArgName(), ("FEED"),
-                            "-" + ARG.operation.getArgName(), ("DELETE"),
-                            "-" + ARG.logFile.getArgName(), (logFile.toString()),
-                            "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-                            "-" + ARG.status.getArgName(), ("SUCCEEDED"),
-                            "-" + ARG.brokerTTL.getArgName(), "10",
-                            "-" + ARG.cluster.getArgName(), "corp", };
+        args = new String[] {
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), TOPIC_NAME,
+            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs",
+            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+            "/click-logs/10/05/05/00/20",
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2011-01-01-01-00",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL,
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), BROKER_URL,
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "FEED",
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), "DELETE",
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), logFile.toString(),
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), "/falcon/feed/agg-logs/",
+            "-" + WorkflowExecutionArgs.TOPIC_NAME.getName(), TOPIC_NAME,
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "10",
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), "corp",
+        };
 
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
@@ -121,7 +127,11 @@ public class FeedProducerTest {
         InputStream in = new ByteArrayInputStream(("instancePaths=").getBytes());
         IOUtils.copyBytes(in, out, conf);
 
-        new MessageProducer().run(this.args);
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                args, WorkflowExecutionContext.Type.POST_PROCESSING);
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.USER).build();
+        jmsMessageProducer.sendMessage();
     }
 
     private void testProcessMessageCreator() throws Exception {
@@ -140,7 +150,13 @@ public class FeedProducerTest {
         };
         t.start();
         Thread.sleep(100);
-        new MessageProducer().run(this.args);
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                args, WorkflowExecutionContext.Type.POST_PROCESSING);
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.USER).build();
+        jmsMessageProducer.sendMessage();
+
         t.join();
         if (error != null) {
             throw error;
@@ -148,13 +164,11 @@ public class FeedProducerTest {
     }
 
     private void consumer() throws JMSException {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                BROKER_URL);
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
-        Session session = connection.createSession(false,
-                Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination destination = session.createTopic(TOPIC_NAME);
         MessageConsumer consumer = session.createConsumer(destination);
 
@@ -165,7 +179,7 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/20");
 
         for (m = null; m == null;) {
@@ -173,7 +187,7 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/21");
 
         for (m = null; m == null;) {
@@ -181,7 +195,7 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/22");
 
         for (m = null; m == null;) {
@@ -189,25 +203,25 @@ public class FeedProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                 "/falcon/feed/agg-logs/path1/2010/10/10/23");
 
         connection.close();
     }
 
     private void assertMessage(MapMessage m) throws JMSException {
-        Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()),
                 TOPIC_NAME);
-        Assert.assertEquals(m.getString(ARG.operation.getArgName()), "DELETE");
-        Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OPERATION.getName()), "DELETE");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.WORKFLOW_ID.getName()),
                 "workflow-01-00");
-        Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.WORKFLOW_USER.getName()),
                 "falcon");
-        Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
-        Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
                 "2011-01-01T01:00Z");
-        Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
                 "2012-01-01T01:00Z");
-        Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
new file mode 100644
index 0000000..694d488
--- /dev/null
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.messaging;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.mortbay.log.Log;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Session;
+import javax.jms.Topic;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test for FalconTopicSubscriber.
+ */
+public class JMSMessageConsumerTest {
+
+    private static final String BROKER_URL = "vm://localhost";
+    private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
+    private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
+    private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
+    private BrokerService broker;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        broker = new BrokerService();
+        broker.addConnector(BROKER_URL);
+        broker.setDataDirectory("target/activemq");
+        broker.setBrokerName("localhost");
+        broker.start();
+    }
+
+    public void sendMessages(String topic) throws JMSException, FalconException, IOException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic destination = session.createTopic(topic);
+        javax.jms.MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        for (int i = 0; i < 3; i++) {
+            WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                    getMockFalconMessage(i), WorkflowExecutionContext.Type.POST_PROCESSING);
+            context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
+
+            MapMessage message = session.createMapMessage();
+            for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
+                message.setString(entry.getKey().getName(), entry.getValue());
+            }
+
+            Log.debug("Sending:" + message);
+            producer.send(message);
+        }
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                getMockFalconMessage(15), WorkflowExecutionContext.Type.POST_PROCESSING);
+        context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
+
+        MapMessage mapMessage = session.createMapMessage();
+        for (Map.Entry<WorkflowExecutionArgs, String> entry : context.entrySet()) {
+            mapMessage.setString(entry.getKey().getName(), entry.getValue());
+        }
+
+        Log.debug("Sending:" + mapMessage);
+        producer.send(mapMessage);
+    }
+
+    private String[] getMockFalconMessage(int i) {
+        Map<String, String> message = new HashMap<String, String>();
+        message.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS);
+        message.put(WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL);
+        message.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), "cluster1");
+        message.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), "process1");
+        message.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), "PROCESS");
+        message.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+                "/clicks/hour/00/0" + i);
+        message.put(WorkflowExecutionArgs.FEED_NAMES.getName(), "clicks");
+        message.put(WorkflowExecutionArgs.LOG_FILE.getName(), "/logfile");
+        message.put(WorkflowExecutionArgs.LOG_DIR.getName(), "/tmp/log");
+        message.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2012-10-10-10-10");
+        message.put(WorkflowExecutionArgs.OPERATION.getName(), "GENERATE");
+        message.put(WorkflowExecutionArgs.RUN_ID.getName(), "0");
+        message.put(WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-10-10-10-1" + i);
+        message.put(WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-" + i);
+        message.put(WorkflowExecutionArgs.TOPIC_NAME.getName(), TOPIC_NAME);
+        message.put(WorkflowExecutionArgs.STATUS.getName(), i != 15 ? "SUCCEEDED" : "FAILED");
+        message.put(WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon");
+
+        String[] args = new String[message.size() * 2];
+        int index = 0;
+        for (Map.Entry<String, String> entry : message.entrySet()) {
+            args[index++] = "-" + entry.getKey();
+            args[index++] = entry.getValue();
+        }
+
+        return args;
+    }
+
+    @Test
+    public void testSubscriber() {
+        try {
+            //Comma separated topics are supported in startup properties
+            JMSMessageConsumer subscriber = new JMSMessageConsumer(BROKER_IMPL_CLASS, "", "",
+                    BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService());
+            subscriber.startSubscriber();
+            sendMessages(TOPIC_NAME);
+            Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
+
+            sendMessages(SECONDARY_TOPIC_NAME);
+            Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
+            Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
+            subscriber.closeSubscriber();
+        } catch (Exception e) {
+            Assert.fail("This should not have thrown an exception.", e);
+        }
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        broker.deleteAllMessages();
+        broker.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
new file mode 100644
index 0000000..90efa3e
--- /dev/null
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.messaging;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for falcon topic message producer.
+ */
+public class JMSMessageProducerTest {
+
+    private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
+    private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
+    private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
+    private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
+    private BrokerService broker;
+    private List<MapMessage> mapMessages;
+
+    private volatile AssertionError error;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        broker = new BrokerService();
+        broker.addConnector(BROKER_URL);
+        broker.setDataDirectory("target/activemq");
+        broker.setBrokerName("localhost");
+        broker.start();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        broker.deleteAllMessages();
+        broker.stop();
+    }
+
+    @Test
+    public void testWithFeedOutputPaths() throws Exception {
+        List<String> args = createCommonArgs();
+        List<String> newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
+                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs,raw-logs",
+                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+                "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
+        args.addAll(newArgs);
+        List<String[]> messages = new ArrayList<String[]>();
+        messages.add(args.toArray(new String[args.size()]));
+        testProcessMessageCreator(messages, TOPIC_NAME);
+        for (MapMessage m : mapMessages) {
+            assertMessage(m);
+            Assert.assertTrue((m.getString(WorkflowExecutionArgs.FEED_NAMES.getName())
+                    .equals("click-logs,raw-logs")));
+            Assert.assertTrue(m
+                    .getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName())
+                    .equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
+        }
+    }
+
+    @Test
+    public void testWithEmptyFeedOutputPaths() throws Exception {
+        List<String> args = createCommonArgs();
+        List<String> newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
+                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "null",
+                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), "null"));
+        args.addAll(newArgs);
+        List<String[]> messages = new ArrayList<String[]>();
+        messages.add(args.toArray(new String[args.size()]));
+        testProcessMessageCreator(messages, TOPIC_NAME);
+        for (MapMessage m : mapMessages) {
+            assertMessage(m);
+            assertMessage(m);
+            Assert.assertTrue(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()).equals(
+                    "null"));
+            Assert.assertTrue(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName())
+                    .equals("null"));
+        }
+    }
+
+    @Test
+    public void testConsumerWithMultipleTopics() throws Exception {
+        List<String[]> messages = new ArrayList<String[]>();
+        List<String> args = createCommonArgs();
+        List<String> newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
+                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "raw-logs",
+                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+                "/raw-logs/10/05/05/00/20"));
+        args.addAll(newArgs);
+        messages.add(args.toArray(new String[args.size()]));
+
+        args = createCommonArgs();
+        newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
+                "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs",
+                "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+                "/click-logs/10/05/05/00/20"));
+        args.addAll(newArgs);
+        messages.add(args.toArray(new String[args.size()]));
+
+        testProcessMessageCreator(messages, TOPIC_NAME+","+SECONDARY_TOPIC_NAME);
+        Assert.assertEquals(mapMessages.size(), 2);
+        for (MapMessage m : mapMessages) {
+            assertMessage(m);
+        }
+    }
+
+    private List<String> createCommonArgs() {
+        return new ArrayList<String>(Arrays.asList(
+                "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+                "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
+                "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+                "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2011-01-01-01-00",
+                "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
+                "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL,
+                "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), (BROKER_IMPL_CLASS),
+                "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+                "-" + WorkflowExecutionArgs.OPERATION.getName(), ("GENERATE"),
+                "-" + WorkflowExecutionArgs.LOG_FILE.getName(), ("/logFile"),
+                "-" + WorkflowExecutionArgs.LOG_DIR.getName(), ("/tmp"),
+                "-" + WorkflowExecutionArgs.STATUS.getName(), ("SUCCEEDED"),
+                "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "10",
+                "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), "corp"));
+    }
+
+    private void testProcessMessageCreator(final List<String[]> messages,
+                                           final String topicsToListen) throws Exception {
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    consumer(messages.size(), topicsToListen);
+                } catch (AssertionError e) {
+                    error = e;
+                } catch (Exception ignore) {
+                    error = null;
+                }
+            }
+        };
+        t.start();
+        Thread.sleep(100);
+
+        for (String[] message : messages) {
+            WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                    message, WorkflowExecutionContext.Type.POST_PROCESSING);
+            JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                    .type(JMSMessageProducer.MessageType.FALCON).build();
+            jmsMessageProducer.sendMessage();
+        }
+
+        t.join();
+        if (error != null) {
+            throw error;
+        }
+    }
+
+    private void consumer(int size, String topicsToListen) throws Exception {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic(topicsToListen);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        mapMessages = new ArrayList<MapMessage>();
+        for (int i=0; i<size; i++) {
+            MapMessage m = (MapMessage) consumer.receive();
+            mapMessages.add(m);
+            System.out.println("Consumed: " + m.toString());
+        }
+
+        connection.close();
+    }
+
+    private void assertMessage(MapMessage message) throws JMSException {
+        Assert.assertEquals(message.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()),
+                "agg-coord");
+        Assert.assertEquals(message.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
+                "2011-01-01T01:00Z");
+        Assert.assertEquals(message.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
+                "2012-01-01T01:00Z");
+        Assert.assertEquals(message.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
+    }
+}


Mime
View raw message