falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject git commit: FALCON-226 Tests for subscribing/listening to multiple active-mq topics. Contributed by Shaik Idris
Date Tue, 24 Dec 2013 14:40:49 GMT
Updated Branches:
  refs/heads/master 744c2911a -> 407cf4a5a


FALCON-226 Tests for subscribing/listening to multiple active-mq topics. Contributed by Shaik
Idris


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/407cf4a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/407cf4a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/407cf4a5

Branch: refs/heads/master
Commit: 407cf4a5abb374a0067e4828b24a317b5f30c9ca
Parents: 744c291
Author: Shwetha GS <shwethags@gmail.com>
Authored: Tue Dec 24 20:10:39 2013 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Tue Dec 24 20:10:39 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../messaging/FalconTopicProducerTest.java      | 161 ++++++++++++-------
 .../service/FalconTopicSubscriberTest.java      |  41 +++--
 3 files changed, 133 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/407cf4a5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 50198a4..2101f5b 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,9 @@ Trunk (Unreleased)
 
     FALCON-124 unable to schedule deleted feed. (Shwetha GS via Shaik Idris)
 
+    FALCON-226 Tests for subscribing/listening to multiple active-mq topics.  (Shaik Idris

+    via Shwetha GS)
+
 Release Version: 0.4-incubating
 
    NEW FEATURES

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/407cf4a5/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index 9912678..27bea68 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.falcon.messaging;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
@@ -37,7 +41,9 @@ public class FalconTopicProducerTest {
     // "tcp://localhost:61616?daemon=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
+    private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
     private BrokerService broker;
+    private List<MapMessage> mapMessages;
 
     private volatile AssertionError error;
 
@@ -58,72 +64,122 @@ public class FalconTopicProducerTest {
 
     @Test
     public void testWithFeedOutputPaths() throws Exception {
-        String[] args = new String[]{"-" + ARG.entityName.getArgName(), "agg-coord",
-                                     "-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
-                                     "-" + ARG.feedInstancePaths.getArgName(),
-                                     "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
-                                     "-" + ARG.workflowId.getArgName(), "workflow-01-00",
-                                     "-" + ARG.runId.getArgName(), "1",
-                                     "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-                                     "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-                                     "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-                                     "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-                                     "-" + ARG.entityType.getArgName(), ("process"),
-                                     "-" + ARG.operation.getArgName(), ("GENERATE"),
-                                     "-" + ARG.logFile.getArgName(), ("/logFile"),
-                                     "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-                                     "-" + ARG.status.getArgName(), ("SUCCEEDED"),
-                                     "-" + ARG.brokerTTL.getArgName(), "10",
-                                     "-" + ARG.cluster.getArgName(), "corp", };
-        testProcessMessageCreator(args);
+        List<String> args = createCommonArgs();
+        List<String> newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + ARG.entityName.getArgName(), "agg-coord",
+                "-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
+                "-" + ARG.feedInstancePaths.getArgName(),
+                "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
+                "-" + ARG.topicName.getArgName(), TOPIC_NAME));
+        args.addAll(newArgs);
+        List<String[]> messages = new ArrayList<String[]>();
+        messages.add(args.toArray(new String[args.size()]));
+        testProcessMessageCreator(messages, TOPIC_NAME);
+        for (MapMessage m : mapMessages) {
+            assertMessage(m);
+            Assert.assertTrue((m.getString(ARG.feedNames.getArgName())
+                    .equals("click-logs,raw-logs")));
+            Assert.assertTrue(m
+                    .getString(ARG.feedInstancePaths.getArgName())
+                    .equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
+        }
     }
 
     @Test
     public void testWithEmptyFeedOutputPaths() throws Exception {
-        String[] args = new String[]{"-" + ARG.entityName.getArgName(), "agg-coord",
-                                     "-" + ARG.feedNames.getArgName(), "null",
-                                     "-" + ARG.feedInstancePaths.getArgName(),
-                                     "null",
-                                     "-" + ARG.workflowId.getArgName(), "workflow-01-00",
-                                     "-" + ARG.runId.getArgName(), "1",
-                                     "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-                                     "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-                                     "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-                                     "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-                                     "-" + ARG.entityType.getArgName(), ("process"),
-                                     "-" + ARG.operation.getArgName(), ("GENERATE"),
-                                     "-" + ARG.logFile.getArgName(), ("/logFile"),
-                                     "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-                                     "-" + ARG.status.getArgName(), ("SUCCEEDED"),
-                                     "-" + ARG.brokerTTL.getArgName(), "10",
-                                     "-" + ARG.cluster.getArgName(), "corp", };
-        testProcessMessageCreator(args);
+        List<String> args = createCommonArgs();
+        List<String> newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + ARG.entityName.getArgName(), "agg-coord",
+                "-" + ARG.feedNames.getArgName(), "null",
+                "-" + ARG.feedInstancePaths.getArgName(), "null",
+                "-" + ARG.topicName.getArgName(), TOPIC_NAME));
+        args.addAll(newArgs);
+        List<String[]> messages = new ArrayList<String[]>();
+        messages.add(args.toArray(new String[args.size()]));
+        testProcessMessageCreator(messages, TOPIC_NAME);
+        for (MapMessage m : mapMessages) {
+            assertMessage(m);
+            assertMessage(m);
+            Assert.assertTrue(m.getString(ARG.feedNames.getArgName()).equals(
+                    "null"));
+            Assert.assertTrue(m.getString(ARG.feedInstancePaths.getArgName())
+                    .equals("null"));
+        }
     }
 
-    private void testProcessMessageCreator(String[] args) throws Exception {
+    @Test
+    public void testConsumerWithMultipleTopics() throws Exception {
+        List<String[]> messages = new ArrayList<String[]>();
+        List<String> args = createCommonArgs();
+        List<String> newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + ARG.entityName.getArgName(), "agg-coord",
+                "-" + ARG.feedNames.getArgName(), "raw-logs",
+                "-" + ARG.feedInstancePaths.getArgName(),
+                "/raw-logs/10/05/05/00/20",
+                "-" + ARG.topicName.getArgName(), TOPIC_NAME));
+        args.addAll(newArgs);
+        messages.add(args.toArray(new String[args.size()]));
+
+        args = createCommonArgs();
+        newArgs = new ArrayList<String>(Arrays.asList(
+                "-" + ARG.entityName.getArgName(), "agg-coord",
+                "-" + ARG.feedNames.getArgName(), "click-logs",
+                "-" + ARG.feedInstancePaths.getArgName(),
+                "/click-logs/10/05/05/00/20",
+                "-" + ARG.topicName.getArgName(), SECONDARY_TOPIC_NAME));
+        args.addAll(newArgs);
+        messages.add(args.toArray(new String[args.size()]));
+
+        testProcessMessageCreator(messages, TOPIC_NAME+","+SECONDARY_TOPIC_NAME);
+        Assert.assertEquals(mapMessages.size(), 2);
+        for (MapMessage m : mapMessages) {
+            assertMessage(m);
+        }
+    }
+
+    private List<String> createCommonArgs() {
+        List<String> args = new ArrayList<String>(Arrays.asList(
+                "-" + ARG.workflowId.getArgName(), "workflow-01-00",
+                "-" + ARG.runId.getArgName(), "1",
+                "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
+                "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
+                "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
+                "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
+                "-" + ARG.entityType.getArgName(), ("process"),
+                "-" + ARG.operation.getArgName(), ("GENERATE"),
+                "-" + ARG.logFile.getArgName(), ("/logFile"),
+                "-" + ARG.status.getArgName(), ("SUCCEEDED"),
+                "-" + ARG.brokerTTL.getArgName(), "10",
+                "-" + ARG.cluster.getArgName(), "corp"));
+        return args;
+    }
+
+    private void testProcessMessageCreator(final List<String[]> messages,
+             final String topicsToListen) throws Exception {
 
         Thread t = new Thread() {
             @Override
             public void run() {
                 try {
-                    consumer();
+                    consumer(messages.size(), topicsToListen);
                 } catch (AssertionError e) {
                     error = e;
-                } catch (JMSException ignore) {
+                } catch (Exception ignore) {
                     error = null;
                 }
             }
         };
         t.start();
-        Thread.sleep(1500);
-        new MessageProducer().run(args);
+        for (String[] message : messages) {
+            new MessageProducer().run(message);
+        }
         t.join();
         if (error != null) {
             throw error;
         }
     }
 
-    private void consumer() throws JMSException {
+    private void consumer(int size, String topicsToListen) throws Exception {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                 BROKER_URL);
         Connection connection = connectionFactory.createConnection();
@@ -131,23 +187,14 @@ public class FalconTopicProducerTest {
 
         Session session = connection.createSession(false,
                 Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(TOPIC_NAME);
+        Destination destination = session.createTopic(topicsToListen);
         MessageConsumer consumer = session.createConsumer(destination);
-
-        // wait till you get atleast one message
-        MapMessage m;
-        for (m = null; m == null;) {
-            m = (MapMessage) consumer.receive();
+        mapMessages = new ArrayList<MapMessage>();
+        for (int i=0; i<size; i++) {
+            MapMessage m = (MapMessage) consumer.receive();
+            mapMessages.add(m);
+            System.out.println("Consumed: " + m.toString());
         }
-        System.out.println("Consumed: " + m.toString());
-
-        assertMessage(m);
-        Assert.assertTrue((m.getString(ARG.feedNames.getArgName())
-                .equals("click-logs,raw-logs"))
-                || (m.getString(ARG.feedNames.getArgName()).equals("null")));
-        Assert.assertTrue(m.getString(ARG.feedInstancePaths.getArgName())
-                .equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20")
-                || (m.getString(ARG.feedInstancePaths.getArgName()).equals("null")));
 
         connection.close();
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/407cf4a5/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
index e390aad..f1536f4 100644
--- a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
@@ -19,10 +19,10 @@ package org.apache.falcon.service;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.falcon.FalconException;
 import org.apache.falcon.messaging.EntityInstanceMessage;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.mortbay.log.Log;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -39,17 +39,19 @@ public class FalconTopicSubscriberTest {
     // "tcp://localhost:61616?daemon=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
+    private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
     private BrokerService broker;
 
     @BeforeClass
     public void setup() throws Exception {
         broker = new BrokerService();
-        broker.setUseJmx(true);
         broker.addConnector(BROKER_URL);
+        broker.setDataDirectory("target/activemq");
+        broker.setBrokerName("localhost");
         broker.start();
     }
 
-    public void sendMessages() throws JMSException {
+    public void sendMessages(String topic) throws JMSException {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                 BROKER_URL);
         Connection connection = connectionFactory.createConnection();
@@ -57,11 +59,11 @@ public class FalconTopicSubscriberTest {
 
         Session session = connection.createSession(false,
                 Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(TOPIC_NAME);
-        javax.jms.MessageProducer producer = session
+        Topic destination = session.createTopic(topic);
+        MessageProducer producer = session
                 .createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 3; i++) {
             EntityInstanceMessage falconMessage = getMockFalconMessage(i);
             MapMessage message = session.createMapMessage();
             for (ARG arg : ARG.values()) {
@@ -73,9 +75,13 @@ public class FalconTopicSubscriberTest {
         }
 
         EntityInstanceMessage message = getMockFalconMessage(15);
+        MapMessage mapMessage = session.createMapMessage();
         message.getKeyValueMap().put(ARG.status, "FAILED");
-        TextMessage textMessage = session.createTextMessage(message.toString());
-        producer.send(textMessage);
+        for (ARG arg : ARG.values()) {
+            mapMessage.setString(arg.getPropName(), message
+                    .getKeyValueMap().get(arg));
+        }
+        producer.send(mapMessage);
     }
 
     private EntityInstanceMessage getMockFalconMessage(int i) {
@@ -99,17 +105,22 @@ public class FalconTopicSubscriberTest {
     }
 
     @Test
-    public void testSubscriber() throws FalconException, JMSException {
-        FalconTopicSubscriber subscriber1 = new FalconTopicSubscriber(
-                BROKER_IMPL_CLASS, "", "", BROKER_URL, TOPIC_NAME);
-
-        subscriber1.startSubscriber();
-        sendMessages();
-        subscriber1.closeSubscriber();
+    public void testSubscriber() throws Exception{
+        //Comma separated topics are supported in startup properties
+        FalconTopicSubscriber subscriber = new FalconTopicSubscriber(
+                BROKER_IMPL_CLASS, "", "", BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME);
+        subscriber.startSubscriber();
+        sendMessages(TOPIC_NAME);
+        Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
+        sendMessages(SECONDARY_TOPIC_NAME);
+        Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
+        Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
+        subscriber.closeSubscriber();
     }
 
     @AfterClass
     public void tearDown() throws Exception {
+        broker.deleteAllMessages();
         broker.stop();
     }
 }


Mime
View raw message