falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [2/5] git commit: FALCON-777 UT intermittent failures in Messaging tests depending on Thread.sleep. Contributed by Sowmya Ramesh
Date Wed, 08 Oct 2014 23:44:41 GMT
FALCON-777 UT intermittent failures in Messaging tests depending on Thread.sleep. Contributed
by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/152afb1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/152afb1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/152afb1c

Branch: refs/heads/master
Commit: 152afb1c4a858f9ca4308016f02ef5037e3e756a
Parents: 5c30543
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Wed Oct 8 14:54:40 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Wed Oct 8 16:20:38 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../falcon/messaging/FeedProducerTest.java      | 61 ++++++++------------
 .../messaging/JMSMessageProducerTest.java       |  7 ++-
 .../falcon/messaging/ProcessProducerTest.java   | 40 ++++++-------
 .../workflow/FalconPostProcessingTest.java      | 50 ++++++++++------
 5 files changed, 83 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/152afb1c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c76b6f0..b17afa3 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -112,6 +112,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-777 UT intermittent failures in Messaging tests depending on Thread.sleep
+   (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-773 Log clean up handlers only work in distributed mode
    (Balu Vellanki via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/152afb1c/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 6c8a926..2f5aa70 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.messaging;
 
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -40,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -61,6 +63,11 @@ public class FeedProducerTest {
     private volatile AssertionError error;
     private EmbeddedCluster dfsCluster;
     private Configuration conf;
+    private CountDownLatch latch = new CountDownLatch(1);
+    private String[] instancePaths = {"/falcon/feed/agg-logs/path1/2010/10/10/20",
+        "/falcon/feed/agg-logs/path1/2010/10/10/21",
+        "/falcon/feed/agg-logs/path1/2010/10/10/22",
+        "/falcon/feed/agg-logs/path1/2010/10/10/23", };
 
     @BeforeClass
     public void setup() throws Exception {
@@ -111,12 +118,7 @@ public class FeedProducerTest {
     public void testLogFile() throws Exception {
         FileSystem fs = dfsCluster.getFileSystem();
         OutputStream out = fs.create(logFile);
-        InputStream in = new ByteArrayInputStream(
-                ("instancePaths=/falcon/feed/agg-logs/path1/2010/10/10/20,"
-                        + "/falcon/feed/agg-logs/path1/2010/10/10/21,"
-                        + "/falcon/feed/agg-logs/path1/2010/10/10/22,"
-                        + "/falcon/feed/agg-logs/path1/2010/10/10/23")
-                        .getBytes());
+        InputStream in = new ByteArrayInputStream(("instancePaths=" + StringUtils.join(instancePaths,
",")).getBytes());
         IOUtils.copyBytes(in, out, conf);
         testProcessMessageCreator();
     }
@@ -150,8 +152,9 @@ public class FeedProducerTest {
             }
         };
         t.start();
-        Thread.sleep(100);
 
+        // Wait for consumer to be ready
+        latch.await();
         WorkflowExecutionContext context = WorkflowExecutionContext.create(
                 args, WorkflowExecutionContext.Type.POST_PROCESSING);
         JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
@@ -173,41 +176,23 @@ public class FeedProducerTest {
         Destination destination = session.createTopic(TOPIC_NAME);
         MessageConsumer consumer = session.createConsumer(destination);
 
-        // wait till you get atleast one message
-        MapMessage m;
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
-        }
-        System.out.println("Consumed: " + m.toString());
-        assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                "/falcon/feed/agg-logs/path1/2010/10/10/20");
+        latch.countDown();
+        verifyMesssage(consumer);
 
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
-        }
-        System.out.println("Consumed: " + m.toString());
-        assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                "/falcon/feed/agg-logs/path1/2010/10/10/21");
+        connection.close();
+    }
 
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
-        }
-        System.out.println("Consumed: " + m.toString());
-        assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                "/falcon/feed/agg-logs/path1/2010/10/10/22");
+    private void verifyMesssage(MessageConsumer consumer) throws JMSException {
+        for (String instancePath : instancePaths) {
+            // receive call is blocking
+            MapMessage m = (MapMessage) consumer.receive();
 
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
+            System.out.println("Received JMS message {}" + m.toString());
+            System.out.println("Consumed: " + m.toString());
+            assertMessage(m);
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
+                    instancePath);
         }
-        System.out.println("Consumed: " + m.toString());
-        assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                "/falcon/feed/agg-logs/path1/2010/10/10/23");
-
-        connection.close();
     }
 
     private void assertMessage(MapMessage m) throws JMSException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/152afb1c/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
index 490292f..d4373de 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -37,6 +37,7 @@ import javax.jms.Session;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Test for falcon topic message producer.
@@ -51,6 +52,7 @@ public class JMSMessageProducerTest {
     private List<MapMessage> mapMessages;
 
     private volatile AssertionError error;
+    private CountDownLatch latch = new CountDownLatch(1);
 
     @BeforeClass
     public void setup() throws Exception {
@@ -175,7 +177,9 @@ public class JMSMessageProducerTest {
             }
         };
         t.start();
-        Thread.sleep(100);
+
+        // Wait for consumer to be ready
+        latch.await();
 
         for (String[] message : messages) {
             WorkflowExecutionContext context = WorkflowExecutionContext.create(
@@ -200,6 +204,7 @@ public class JMSMessageProducerTest {
         Destination destination = session.createTopic(topicsToListen);
         MessageConsumer consumer = session.createConsumer(destination);
 
+        latch.countDown();
         mapMessages = new ArrayList<MapMessage>();
         for (int i=0; i<size; i++) {
             MapMessage m = (MapMessage) consumer.receive();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/152afb1c/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 80c2701..1cd3310 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -22,6 +22,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.commons.lang.StringUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -34,6 +35,7 @@ import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Test for process message producer.
@@ -47,14 +49,16 @@ public class ProcessProducerTest {
     private BrokerService broker;
 
     private volatile AssertionError error;
+    private CountDownLatch latch = new CountDownLatch(1);
+    private String[] outputFeedNames = {"click-logs", "raw-logs"};
+    private String[] outputFeedPaths = {"/click-logs/10/05/05/00/20", "/raw-logs/10/05/05/00/20"};
 
     @BeforeClass
     public void setup() throws Exception {
         args = new String[] {
             "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs,raw-logs",
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
-            "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(outputFeedNames,
","),
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(outputFeedPaths,
","),
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
             "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
@@ -103,7 +107,7 @@ public class ProcessProducerTest {
             }
         };
         t.start();
-        Thread.sleep(100);
+        latch.await();
 
         WorkflowExecutionContext context = WorkflowExecutionContext.create(
                 args, WorkflowExecutionContext.Type.POST_PROCESSING);
@@ -126,25 +130,17 @@ public class ProcessProducerTest {
         Destination destination = session.createTopic(getTopicName());
         MessageConsumer consumer = session.createConsumer(destination);
 
-        // wait till you get at least one message
-        MapMessage m;
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
-        }
-        System.out.println("Consumed: " + m.toString());
-        assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
"click-logs");
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                "/click-logs/10/05/05/00/20");
-
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
+        latch.countDown();
+
+        for(int index = 0; index < outputFeedNames.length; ++index) {
+            MapMessage m = (MapMessage) consumer.receive();
+            System.out.println("Consumed: " + m.toString());
+            assertMessage(m);
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
+                    outputFeedNames[index]);
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
+                    outputFeedPaths[index]);
         }
-        System.out.println("Consumed: " + m.toString());
-        assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
"raw-logs");
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                "/raw-logs/10/05/05/00/20");
         connection.close();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/152afb1c/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 2787d7f..f08949d 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -21,12 +21,14 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.falcon.workflow.FalconPostProcessing;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.commons.lang.StringUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Test for validating the falcon post processing utility.
@@ -36,19 +38,20 @@ public class FalconPostProcessingTest {
     private String[] args;
     private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
-    private static final String FALCON_TOPIC_NAME = "FALCON.ENTITY.TOPIC";
     private static final String ENTITY_NAME = "agg-coord";
     private BrokerService broker;
 
     private volatile AssertionError error;
+    private CountDownLatch latch = new CountDownLatch(1);
+    private String[] outputFeedNames = {"out-click-logs", "out-raw-logs"};
+    private String[] outputFeedPaths = {"/out-click-logs/10/05/05/00/20", "/out-raw-logs/10/05/05/00/20"};
 
     @BeforeClass
     public void setup() throws Exception {
         args = new String[]{
             "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
-            "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(outputFeedNames,
","),
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(outputFeedPaths,
","),
             "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
             "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
             "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
@@ -95,8 +98,8 @@ public class FalconPostProcessingTest {
             @Override
             public void run() {
                 try {
-                    consumer(BROKER_URL, "FALCON." + ENTITY_NAME);  // user message
-                    consumer(BROKER_URL, FALCON_TOPIC_NAME);  // falcon message
+                    // falcon message [FALCON_TOPIC_NAME] and user message ["FALCON." + ENTITY_NAME]
+                    consumer(BROKER_URL, "FALCON.>");
                 } catch (AssertionError e) {
                     error = e;
                 } catch (JMSException ignore) {
@@ -105,7 +108,8 @@ public class FalconPostProcessingTest {
             }
         };
         t.start();
-        Thread.sleep(1500);
+
+        latch.await();
         new FalconPostProcessing().run(this.args);
         t.join();
         if (error != null) {
@@ -122,20 +126,32 @@ public class FalconPostProcessingTest {
         Destination destination = session.createTopic(topic);
         MessageConsumer consumer = session.createConsumer(destination);
 
-        // wait till you get atleast one message
-        MapMessage m;
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
-        }
-        System.out.println("Consumed: " + m.toString());
+        latch.countDown();
+
+        // Verify user message
+        verifyMesssage(consumer);
+
+        // Verify falcon message
+        verifyMesssage(consumer);
 
-        assertMessage(m);
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
"out-click-logs");
-        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                "/out-click-logs/10/05/05/00/20");
         connection.close();
     }
 
+    private void verifyMesssage(MessageConsumer consumer) throws JMSException {
+        for (int index = 0; index < outputFeedPaths.length; ++index) {
+            // receive call is blocking
+            MapMessage m = (MapMessage) consumer.receive();
+
+            System.out.println("Received JMS message {}" + m.toString());
+            System.out.println("Consumed: " + m.toString());
+            assertMessage(m);
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
+                    outputFeedNames[index]);
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
+                    outputFeedPaths[index]);
+        }
+    }
+
     private void assertMessage(MapMessage m) throws JMSException {
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()), "agg-coord");
         String workflowUser = m.getString(WorkflowExecutionArgs.WORKFLOW_USER.getName());


Mime
View raw message