falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peeyu...@apache.org
Subject falcon git commit: FALCON-2051 PostProcessing needs to send JMS message so that REPL metrics can be added to the GraphDB
Date Thu, 30 Jun 2016 10:00:10 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.10 600b6bcc2 -> 0222d390b


FALCON-2051 PostProcessing needs to send JMS message so that REPL metrics can be added to
the GraphDB

Basically, reverted the FALCON-1926 changes.
We will attempt to solve it properly by not sending JMS message from Falcon PostProcessing
after 0.10 release.

Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>

Reviewers: Venkat <n.r.v@gmail.com>, Pallavi Rao <pallavi.rao@inmobi.com>, Praveen
Adlakha <adlakha.praveen@gmail.com>, Peeyush<peeyushb@apache.org>

Closes #201 from vramachan/FALCON-2051.PostProcessingNotInvoked.0.10


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0222d390
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0222d390
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0222d390

Branch: refs/heads/0.10
Commit: 0222d390b5abef4b6b7263c84a3d409afac0afd6
Parents: 600b6bc
Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>
Authored: Thu Jun 30 15:29:58 2016 +0530
Committer: peeyush b <pbishnoi@hortonworks.com>
Committed: Thu Jun 30 15:29:58 2016 +0530

----------------------------------------------------------------------
 .../falcon/entity/WorkflowNameBuilder.java      |  7 ----
 docs/src/site/twiki/Configuration.twiki         |  3 --
 .../falcon/messaging/JMSMessageConsumer.java    |  3 +-
 .../messaging/JMSMessageConsumerTest.java       | 42 +++-----------------
 .../falcon/workflow/FalconPostProcessing.java   | 15 +++++++
 5 files changed, 22 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
index f0d6073..c58be64 100644
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
@@ -34,9 +34,6 @@ import java.util.regex.Pattern;
 public class WorkflowNameBuilder<T extends Entity> {
     private static final String PREFIX = "FALCON";
 
-    // Oozie JMS message property name that holds the workflow app name
-    private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName";
-
     private T entity;
     private Tag tag;
     private List<String> suffixes;
@@ -156,9 +153,5 @@ public class WorkflowNameBuilder<T extends Entity> {
             }
             return null;
         }
-
-        public static String getJMSFalconSelector() {
-            return String.format("%s like '%s%s%%'", OOZIE_JMS_MSG_APPNAME_PROP, PREFIX,
SEPARATOR);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index ce32019..98acb83 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -103,9 +103,6 @@ Oozie workflow completes. Falcon listens to Oozie notification via JMS.
You need
 explained below. Falcon post processing feature continues to only send user notifications
so enabling Oozie
 JMS notification is important.
 
-*NOTE : If Oozie JMS notification is not enabled, the Falcon features such as failure retry,
late data handling and metadata
-service will be disabled for all entities on the server.*
-
 ---+++Enable Oozie JMS notification
 
    * Please add/change the following properties in oozie-site.xml in the oozie installation
dir.

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 8b48e93..90bbdd3 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -92,8 +92,7 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener
{
 
             topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Topic destination = topicSession.createTopic(topicName);
-            topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID,
-                    WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), false);
+            topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
             topicSubscriber.setMessageListener(this);
 
             connection.setExceptionListener(this);

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 6237bdf..cffdb59 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -83,11 +83,6 @@ public class JMSMessageConsumerTest {
 
     public void sendMessages(String topic, WorkflowExecutionContext.Type type)
         throws JMSException, FalconException, IOException {
-        sendMessages(topic, type, true);
-    }
-
-    public void sendMessages(String topic, WorkflowExecutionContext.Type type, boolean isFalconWF)
-        throws JMSException, FalconException, IOException {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
@@ -105,10 +100,10 @@ public class JMSMessageConsumerTest {
                 message = getMockFalconMessage(i, session);
                 break;
             case WORKFLOW_JOB:
-                message = getMockOozieMessage(i, session, isFalconWF);
+                message = getMockOozieMessage(i, session);
                 break;
             case COORDINATOR_ACTION:
-                message = getMockOozieCoordMessage(i, session, isFalconWF);
+                message = getMockOozieCoordMessage(i, session);
             default:
                 break;
             }
@@ -117,15 +112,10 @@ public class JMSMessageConsumerTest {
         }
     }
 
-    private Message getMockOozieMessage(int i, Session session, boolean isFalconWF)
-        throws FalconException, JMSException {
+    private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException
{
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "WORKFLOW_JOB");
-        if (isFalconWF) {
-            message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
-        } else {
-            message.setStringProperty("appName", "OozieSampleShellWF");
-        }
+        message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
         message.setStringProperty("user", "falcon");
         switch(i % 4) {
         case 0:
@@ -152,15 +142,11 @@ public class JMSMessageConsumerTest {
         return message;
     }
 
-    private Message getMockOozieCoordMessage(int i, Session session, boolean isFalconWF)
+    private Message getMockOozieCoordMessage(int i, Session session)
         throws FalconException, JMSException {
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "COORDINATOR_ACTION");
-        if (isFalconWF) {
-            message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
-        } else {
-            message.setStringProperty("appName", "OozieSampleShellWF");
-        }
+        message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
         message.setStringProperty("user", "falcon");
         switch(i % 5) {
         case 0:
@@ -292,20 +278,4 @@ public class JMSMessageConsumerTest {
         broker.stop();
         subscriber.closeSubscriber();
     }
-
-    @Test
-    public void testJMSMessagesFromOozieForNonFalconWF() throws Exception {
-        sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, false /* isFalconWF
*/);
-
-        final BrokerView adminView = broker.getAdminView();
-        Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
-
-        Thread.sleep(100);
-        Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class));
-        Mockito.verify(jobEndService, Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/0222d390/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4961896..ea914f6 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -48,6 +48,14 @@ public class FalconPostProcessing extends Configured implements Tool {
         // serialize the context to HDFS under logs dir before sending the message
         context.serialize();
 
+        boolean systemNotificationEnabled = Boolean.parseBoolean(context.
+            getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));
+
+        if (systemNotificationEnabled) {
+            LOG.info("Sending Falcon message {} ", context);
+            invokeFalconMessageProducer(context);
+        }
+
         String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
         boolean userNotificationEnabled = Boolean.parseBoolean(context.
                 getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true"));
@@ -72,6 +80,13 @@ public class FalconPostProcessing extends Configured implements Tool {
         jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
+    private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception
{
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+            .type(JMSMessageProducer.MessageType.FALCON)
+            .build();
+        jmsMessageProducer.sendMessage();
+    }
+
     private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {


Mime
View raw message