incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1431177 [2/2] - in /incubator/hcatalog/trunk: ./ core/src/main/java/org/apache/hcatalog/common/ server-extensions/src/main/java/org/apache/hcatalog/listener/ server-extensions/src/main/java/org/apache/hcatalog/messaging/ server-extensions/...
Date Thu, 10 Jan 2013 02:06:07 GMT
Modified: incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1431177&r1=1431176&r2=1431177&view=diff
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java
(original)
+++ incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java
Thu Jan 10 02:06:07 2013
@@ -29,26 +29,32 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
+import javax.jms.TextMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.mapreduce.HCatBaseTest;
 
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.MessageDeserializer;
+import org.apache.hcatalog.messaging.MessageFactory;
+import org.apache.hcatalog.messaging.jms.MessagingUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -95,10 +101,9 @@ public class TestNotificationListener ex
     @After
     public void tearDown() throws Exception {
         List<String> expectedMessages = Arrays.asList(
-                HCatConstants.HCAT_ADD_DATABASE_EVENT,
-                HCatConstants.HCAT_ADD_TABLE_EVENT,
+                HCatConstants.HCAT_CREATE_DATABASE_EVENT,
+                HCatConstants.HCAT_CREATE_TABLE_EVENT,
                 HCatConstants.HCAT_ADD_PARTITION_EVENT,
-                HCatConstants.HCAT_PARTITION_DONE_EVENT,
                 HCatConstants.HCAT_DROP_PARTITION_EVENT,
                 HCatConstants.HCAT_DROP_TABLE_EVENT,
                 HCatConstants.HCAT_DROP_DATABASE_EVENT);
@@ -125,61 +130,87 @@ public class TestNotificationListener ex
         String event;
         try {
             event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+            String format = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT);
+            String version = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION);
+            String messageBody = ((TextMessage)msg).getText();
             actualMessages.add(event);
+            MessageDeserializer deserializer = MessageFactory.getDeserializer(format, version);
 
-            if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
+            if (event.equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) {
 
                 Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,
msg
                         .getJMSDestination().toString());
-                Assert.assertEquals("mydb",
-                        ((Database) ((ObjectMessage) msg).getObject()).getName());
-            } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
+                CreateDatabaseMessage message =  deserializer.getCreateDatabaseMessage(messageBody);
+                Assert.assertEquals("mydb", message.getDB());
+                HCatEventMessage message2 = MessagingUtils.getMessage(msg);
+                Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateDatabaseMessage);
+                Assert.assertEquals("mydb", message2.getDB());
+            } else if (event.equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) {
 
                 Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
-                Table tbl = (Table) (((ObjectMessage) msg).getObject());
-                Assert.assertEquals("mytbl", tbl.getTableName());
-                Assert.assertEquals("mydb", tbl.getDbName());
-                Assert.assertEquals(1, tbl.getPartitionKeysSize());
+                CreateTableMessage message = deserializer.getCreateTableMessage(messageBody);
+                Assert.assertEquals("mytbl", message.getTable());
+                Assert.assertEquals("mydb", message.getDB());
+                HCatEventMessage message2 = MessagingUtils.getMessage(msg);
+                Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateTableMessage);
+                Assert.assertEquals("mydb", message2.getDB());
+                Assert.assertEquals("mytbl", ((CreateTableMessage) message2).getTable());
             } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
 
                 Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
                         .toString());
-                Partition part = (Partition) (((ObjectMessage) msg).getObject());
-                Assert.assertEquals("mytbl", part.getTableName());
-                Assert.assertEquals("mydb", part.getDbName());
-                List<String> vals = new ArrayList<String>(1);
-                vals.add("2011");
-                Assert.assertEquals(vals, part.getValues());
+                AddPartitionMessage message = deserializer.getAddPartitionMessage(messageBody);
+                Assert.assertEquals("mytbl", message.getTable());
+                Assert.assertEquals("mydb", message.getDB());
+                Assert.assertEquals(1, message.getPartitions().size());
+                Assert.assertEquals("2011", message.getPartitions().get(0).get("b"));
+                HCatEventMessage message2 = MessagingUtils.getMessage(msg);
+                Assert.assertTrue("Unexpected message-type.", message2 instanceof AddPartitionMessage);
+                Assert.assertEquals("mydb", message2.getDB());
+                Assert.assertEquals("mytbl", ((AddPartitionMessage) message2).getTable());
+                Assert.assertEquals(1, ((AddPartitionMessage) message2).getPartitions().size());
+                Assert.assertEquals("2011", ((AddPartitionMessage) message2).getPartitions().get(0).get("b"));
             } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
 
                 Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
                         .toString());
-                Partition part = (Partition) (((ObjectMessage) msg).getObject());
-                Assert.assertEquals("mytbl", part.getTableName());
-                Assert.assertEquals("mydb", part.getDbName());
-                List<String> vals = new ArrayList<String>(1);
-                vals.add("2011");
-                Assert.assertEquals(vals, part.getValues());
+                DropPartitionMessage message = deserializer.getDropPartitionMessage(messageBody);
+                Assert.assertEquals("mytbl", message.getTable());
+                Assert.assertEquals("mydb", message.getDB());
+                Assert.assertEquals(1, message.getPartitions().size());
+                Assert.assertEquals("2011", message.getPartitions().get(0).get("b"));
+                HCatEventMessage message2 = MessagingUtils.getMessage(msg);
+                Assert.assertTrue("Unexpected message-type.", message2 instanceof DropPartitionMessage);
+                Assert.assertEquals("mydb", message2.getDB());
+                Assert.assertEquals("mytbl", ((DropPartitionMessage) message2).getTable());
+                Assert.assertEquals(1, ((DropPartitionMessage) message2).getPartitions().size());
+                Assert.assertEquals("2011", ((DropPartitionMessage) message2).getPartitions().get(0).get("b"));
             } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
 
                 Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
-                Table tbl = (Table) (((ObjectMessage) msg).getObject());
-                Assert.assertEquals("mytbl", tbl.getTableName());
-                Assert.assertEquals("mydb", tbl.getDbName());
-                Assert.assertEquals(1, tbl.getPartitionKeysSize());
+                DropTableMessage message = deserializer.getDropTableMessage(messageBody);
+                Assert.assertEquals("mytbl", message.getTable());
+                Assert.assertEquals("mydb", message.getDB());
+                HCatEventMessage message2 = MessagingUtils.getMessage(msg);
+                Assert.assertTrue("Unexpected message-type.", message2 instanceof DropTableMessage);
+                Assert.assertEquals("mydb", message2.getDB());
+                Assert.assertEquals("mytbl", ((DropTableMessage) message2).getTable());
             } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
 
                 Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,
msg
                         .getJMSDestination().toString());
-                Assert.assertEquals("mydb",
-                        ((Database) ((ObjectMessage) msg).getObject()).getName());
+                DropDatabaseMessage message =  deserializer.getDropDatabaseMessage(messageBody);
+                Assert.assertEquals("mydb", message.getDB());
+                HCatEventMessage message2 = MessagingUtils.getMessage(msg);
+                Assert.assertTrue("Unexpected message-type.", message2 instanceof DropDatabaseMessage);
+                Assert.assertEquals("mydb", message2.getDB());
             } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
-                Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
-                        .toString());
-                MapMessage mapMsg = (MapMessage) msg;
-                assert mapMsg.getString("b").equals("2011");
-            } else
-                assert false;
+                // TODO: Fill in when PARTITION_DONE_EVENT is supported.
+                Assert.assertTrue("Unexpected: HCAT_PARTITION_DONE_EVENT not supported (yet).",
false);
+            } else {
+                Assert.assertTrue("Unexpected event-type: " + event, false);
+            }
+
         } catch (JMSException e) {
             e.printStackTrace(System.err);
             assert false;



Mime
View raw message