incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1431177 [1/2] - in /incubator/hcatalog/trunk: ./ core/src/main/java/org/apache/hcatalog/common/ server-extensions/src/main/java/org/apache/hcatalog/listener/ server-extensions/src/main/java/org/apache/hcatalog/messaging/ server-extensions/...
Date Thu, 10 Jan 2013 02:06:07 GMT
Author: gates
Date: Thu Jan 10 02:06:07 2013
New Revision: 1431177

URL: http://svn.apache.org/viewvc?rev=1431177&view=rev
Log:
HCATALOG-546 Rework HCatalog's JMS Notifications

Added:
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
    incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java
    incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1431177&r1=1431176&r2=1431177&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Jan 10 02:06:07 2013
@@ -22,6 +22,7 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-546 Rework HCatalog's JMS Notifications (mithunr via gates) 
 
   IMPROVEMENTS
 

Modified: incubator/hcatalog/trunk/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java?rev=1431177&r1=1431176&r2=1431177&view=diff
==============================================================================
--- incubator/hcatalog/trunk/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/core/src/main/java/org/apache/hcatalog/common/HCatConstants.java Thu Jan 10 02:06:07 2013
@@ -121,13 +121,18 @@ public final class HCatConstants {
     // Message Bus related properties.
     public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat";
     public static final String HCAT_EVENT = "HCAT_EVENT";
-    public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
-    public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
-    public static final String HCAT_PARTITION_DONE_EVENT = "HCAT_PARTITION_DONE";
-    public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE";
-    public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE";
-    public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE";
-    public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE";
+    public static final String HCAT_ADD_PARTITION_EVENT = "ADD_PARTITION";
+    public static final String HCAT_DROP_PARTITION_EVENT = "DROP_PARTITION";
+    public static final String HCAT_PARTITION_DONE_EVENT = "PARTITION_DONE";
+    public static final String HCAT_CREATE_TABLE_EVENT = "CREATE_TABLE";
+    public static final String HCAT_DROP_TABLE_EVENT = "DROP_TABLE";
+    public static final String HCAT_CREATE_DATABASE_EVENT = "CREATE_DATABASE";
+    public static final String HCAT_DROP_DATABASE_EVENT = "DROP_DATABASE";
+    public static final String HCAT_MESSAGE_VERSION = "HCAT_MESSAGE_VERSION";
+    public static final String HCAT_MESSAGE_FORMAT = "HCAT_MESSAGE_FORMAT";
+    public static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl.";
+    public static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format";
+    public static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory";
 
     // System environment variables
     public static final String SYSENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION";

Modified: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1431177&r1=1431176&r2=1431177&view=diff
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Thu Jan 10 02:06:07 2013
@@ -19,11 +19,8 @@
 
 package org.apache.hcatalog.listener;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -31,7 +28,6 @@ import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -62,6 +58,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
 import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.MessageFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +78,7 @@ public class NotificationListener extend
     private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
     protected Session session;
     protected Connection conn;
+    private static MessageFactory messageFactory = MessageFactory.getInstance();
 
     /**
      * Create message bus connection and session in constructor.
@@ -112,7 +111,7 @@ public class NotificationListener extend
             Partition partition = partitionEvent.getPartition();
             String topicName = getTopicName(partition, partitionEvent);
             if (topicName != null && !topicName.equals("")) {
-                send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
+                send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partition), topicName);
             } else {
                 LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
                     + partition.getDbName()
@@ -149,7 +148,7 @@ public class NotificationListener extend
             sd.getSkewedInfo().setSkewedColNames(new ArrayList<String>());
             String topicName = getTopicName(partition, partitionEvent);
             if (topicName != null && !topicName.equals("")) {
-                send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
+                send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName);
             } else {
                 LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
                     + partition.getDbName()
@@ -168,9 +167,10 @@ public class NotificationListener extend
         // Subscriber can get notification about addition of a database in HCAT
         // by listening on a topic named "HCAT" and message selector string
         // as "HCAT_EVENT = HCAT_ADD_DATABASE"
-        if (dbEvent.getStatus())
-            send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
-                .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
+        if (dbEvent.getStatus()) {
+            String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+            send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName);
+        }
     }
 
     @Override
@@ -178,9 +178,10 @@ public class NotificationListener extend
         // Subscriber can get notification about drop of a database in HCAT
         // by listening on a topic named "HCAT" and message selector string
         // as "HCAT_EVENT = HCAT_DROP_DATABASE"
-        if (dbEvent.getStatus())
-            send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
-                .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
+        if (dbEvent.getStatus())  {
+            String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+            send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
+        }
     }
 
     @Override
@@ -210,9 +211,8 @@ public class NotificationListener extend
                 me.initCause(e);
                 throw me;
             }
-            send(newTbl, getTopicPrefix(conf) + "."
-                + newTbl.getDbName().toLowerCase(),
-                HCatConstants.HCAT_ADD_TABLE_EVENT);
+            String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase();
+            send(messageFactory.buildCreateTableMessage(newTbl), topicName);
         }
     }
 
@@ -243,72 +243,60 @@ public class NotificationListener extend
 
         if (tableEvent.getStatus()) {
             Table table = tableEvent.getTable();
-            StorageDescriptor sd = table.getSd();
-            sd.setBucketCols(new ArrayList<String>());
-            sd.setSortCols(new ArrayList<Order>());
-            sd.setParameters(new HashMap<String, String>());
-            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-            sd.getSkewedInfo().setSkewedColNames(new ArrayList<String>());
-            send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
-                + table.getDbName().toLowerCase(),
-                HCatConstants.HCAT_DROP_TABLE_EVENT);
+            String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase();
+            send(messageFactory.buildDropTableMessage(table), topicName);
         }
     }
 
     /**
-     * @param msgBody
-     *          is the metastore object. It is sent in full such that if
-     *          subscriber is really interested in details, it can reconstruct it
-     *          fully. In case of finalize_partition message this will be string
-     *          specification of the partition.
-     * @param topicName
-     *          is the name on message broker on which message is sent.
-     * @param event
-     *          is the value of HCAT_EVENT property in message. It can be used to
-     *          select messages in client side.
+     * @param hCatEventMessage The HCatEventMessage being sent over JMS.
+     * @param topicName is the name on message broker on which message is sent.
      */
-    protected void send(Object msgBody, String topicName, String event) {
+    protected void send(HCatEventMessage hCatEventMessage, String topicName) {
         try {
-
             Destination topic = null;
-            if (null == session) {
+            if(null == session){
                 // this will happen, if we never able to establish a connection.
                 createConnection();
-                if (null == session) {
+                if (null == session){
                     // Still not successful, return from here.
-                    LOG.error("Invalid session. Failed to send message on topic: "
-                        + topicName + " event: " + event);
+                    LOG.error("Invalid session. Failed to send message on topic: " +
+                            topicName + " event: " + hCatEventMessage.getEventType());
                     return;
                 }
             }
-            topic = getTopic(topicName);
-            if (null == topic) {
+            try{
+                // Topics are created on demand. If it doesn't exist on broker it will
+                // be created when broker receives this message.
+                topic = session.createTopic(topicName);
+            } catch (IllegalStateException ise){
+                // this will happen if we were able to establish connection once, but its no longer valid,
+                // ise is thrown, catch it and retry.
+                LOG.error("Seems like connection is lost. Retrying", ise);
+                createConnection();
+                topic = session.createTopic(topicName);
+            }
+            if (null == topic){
                 // Still not successful, return from here.
-                LOG.error("Invalid session. Failed to send message on topic: "
-                    + topicName + " event: " + event);
+                LOG.error("Invalid session. Failed to send message on topic: " +
+                        topicName + " event: " + hCatEventMessage.getEventType());
                 return;
             }
+
             MessageProducer producer = session.createProducer(topic);
-            Message msg;
-            if (msgBody instanceof Map) {
-                MapMessage mapMsg = session.createMapMessage();
-                Map<String, String> incomingMap = (Map<String, String>) msgBody;
-                for (Entry<String, String> partCol : incomingMap.entrySet()) {
-                    mapMsg.setString(partCol.getKey(), partCol.getValue());
-                }
-                msg = mapMsg;
-            } else {
-                msg = session.createObjectMessage((Serializable) msgBody);
-            }
+            Message msg = session.createTextMessage(hCatEventMessage.toString());
 
-            msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+            msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
+            msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
+            msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
             producer.send(msg);
             // Message must be transacted before we return.
             session.commit();
-        } catch (Exception e) {
+        }
+        catch(Exception e){
             // Gobble up the exception. Message delivery is best effort.
-            LOG.error("Failed to send message on topic: " + topicName + " event: "
-                    + event, e);
+            LOG.error("Failed to send message on topic: " + topicName +
+                    " event: " + hCatEventMessage.getEventType(), e);
         }
     }
 
@@ -383,12 +371,9 @@ public class NotificationListener extend
     @Override
     public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
         throws MetaException {
-        if (lpde.getStatus())
-            send(
-                lpde.getPartitionName(),
-                lpde.getTable().getParameters()
-                    .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
-                HCatConstants.HCAT_PARTITION_DONE_EVENT);
+//  TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose.
+//		if(lpde.getStatus())
+//			send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
     }
 
     @Override

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The HCat message sent when partition(s) are added to a table.
+ */
+public abstract class AddPartitionMessage extends HCatEventMessage {
+
+    protected AddPartitionMessage() {
+        super(EventType.ADD_PARTITION);
+    }
+
+    /**
+     * Getter for name of table (where partitions are added).
+     * @return Table-name (String).
+     */
+    public abstract String getTable();
+
+    /**
+     * Getter for list of partitions added.
+     * @return List of maps, where each map identifies values for each partition-key, for every added partition.
+     */
+    public abstract List<Map<String, String>> getPartitions ();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        if (getPartitions() == null)
+            throw new IllegalStateException("Partition-list unset.");
+        return super.checkValid();
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Database is created in HCatalog.
+ */
+public abstract class CreateDatabaseMessage extends HCatEventMessage {
+
+    protected CreateDatabaseMessage() {
+        super(EventType.CREATE_DATABASE);
+    }
+
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a table is created in HCatalog.
+ */
+public abstract class CreateTableMessage extends HCatEventMessage {
+
+    protected CreateTableMessage() {
+        super(EventType.CREATE_TABLE);
+    }
+
+    /**
+     * Getter for the name of table created in HCatalog.
+     * @return Table-name (String).
+     */
+    public abstract String getTable();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        return super.checkValid();
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Database is dropped from HCatalog.
+ */
+public abstract class DropDatabaseMessage extends HCatEventMessage {
+
+    protected DropDatabaseMessage() {
+        super(EventType.DROP_DATABASE);
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HCat message sent when a partition is dropped in HCatalog.
+ */
+public abstract class DropPartitionMessage extends HCatEventMessage {
+
+    protected DropPartitionMessage() {
+        super(EventType.DROP_PARTITION);
+    }
+
+    public abstract String getTable();
+    public abstract List<Map<String, String>> getPartitions ();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        if (getPartitions() == null)
+            throw new IllegalStateException("Partition-list unset.");
+        return super.checkValid();
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Table is dropped in HCatalog.
+ */
+public abstract class DropTableMessage extends HCatEventMessage {
+
+    protected DropTableMessage() {
+        super(EventType.DROP_TABLE);
+    }
+
+    /**
+     * Getter for the name of the table being dropped.
+     * @return Table-name (String).
+     */
+    public abstract String getTable();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        return super.checkValid();
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Class representing messages emitted when Metastore operations are done.
+ * (E.g. Creation and deletion of databases, tables and partitions.)
+ */
+public abstract class HCatEventMessage {
+
+    /**
+     * Enumeration of all supported types of Metastore operations.
+     */
+    public static enum EventType {
+
+        CREATE_DATABASE(HCatConstants.HCAT_CREATE_DATABASE_EVENT),
+        DROP_DATABASE(HCatConstants.HCAT_DROP_DATABASE_EVENT),
+        CREATE_TABLE(HCatConstants.HCAT_CREATE_TABLE_EVENT),
+        DROP_TABLE(HCatConstants.HCAT_DROP_TABLE_EVENT),
+        ADD_PARTITION(HCatConstants.HCAT_ADD_PARTITION_EVENT),
+        DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT);
+
+        private String typeString;
+
+        EventType(String typeString) {
+            this.typeString = typeString;
+        }
+
+        @Override
+        public String toString() { return typeString; }
+    }
+
+    protected EventType eventType;
+
+    protected HCatEventMessage(EventType eventType) {
+        this.eventType = eventType;
+    }
+
+    public EventType getEventType() {
+        return eventType;
+    }
+
+    /**
+     * Getter for HCatalog Server's URL.
+     * (This is where the event originates from.)
+     * @return HCatalog Server's URL (String).
+     */
+    public abstract String getServer();
+
+    /**
+     * Getter for the Kerberos principal of the HCatalog service.
+     * @return HCatalog Service Principal (String).
+     */
+    public abstract String getServicePrincipal();
+
+    /**
+     * Getter for the name of the Database on which the Metastore operation is done.
+     * @return Database-name (String).
+     */
+    public abstract String getDB();
+
+    /**
+     * Getter for the timestamp associated with the operation.
+     * @return Timestamp (Long - seconds since epoch).
+     */
+    public abstract Long   getTimestamp();
+
+    /**
+     * Class invariant. Checked after construction or deserialization.
+     */
+    public HCatEventMessage checkValid() {
+        if (getServer() == null || getServicePrincipal() == null)
+            throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null.");
+        if (getEventType() == null)
+            throw new IllegalStateException("Event-type unset.");
+        if (getDB() == null)
+            throw new IllegalArgumentException("DB-name unset.");
+
+        return this;
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * Interface for converting HCat events from String-form back to HCatEventMessage instances.
+ */
+public abstract class MessageDeserializer {
+
+    /**
+     * Method to construct HCatEventMessage from string.
+     */
+    public HCatEventMessage getHCatEventMessage(String eventTypeString, String messageBody) {
+
+        switch (HCatEventMessage.EventType.valueOf(eventTypeString)) {
+        case CREATE_DATABASE:
+            return getCreateDatabaseMessage(messageBody);
+        case DROP_DATABASE:
+            return getDropDatabaseMessage(messageBody);
+        case CREATE_TABLE:
+            return getCreateTableMessage(messageBody);
+        case DROP_TABLE:
+            return getDropTableMessage(messageBody);
+        case ADD_PARTITION:
+            return getAddPartitionMessage(messageBody);
+        case DROP_PARTITION:
+            return getDropPartitionMessage(messageBody);
+
+        default:
+            throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
+        }
+    }
+
+    /**
+     * Method to de-serialize CreateDatabaseMessage instance.
+     */
+    public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody);
+
+    /**
+     * Method to de-serialize DropDatabaseMessage instance.
+     */
+    public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody);
+
+    /**
+     * Method to de-serialize CreateTableMessage instance.
+     */
+    public abstract CreateTableMessage getCreateTableMessage(String messageBody);
+
+    /**
+     * Method to de-serialize DropTableMessage instance.
+     */
+    public abstract DropTableMessage getDropTableMessage(String messageBody);
+
+    /**
+     * Method to de-serialize AddPartitionMessage instance.
+     */
+    public abstract AddPartitionMessage getAddPartitionMessage(String messageBody);
+
+    /**
+     * Method to de-serialize DropPartitionMessage instance.
+     */
+    public abstract DropPartitionMessage getDropPartitionMessage(String messageBody);
+
+    // Protection against construction.
+    protected MessageDeserializer() {}
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.messaging.json.JSONMessageFactory;
+
+/**
+ * Abstract Factory for the construction of HCatalog message instances.
+ */
+public abstract class MessageFactory {
+
+    private static MessageFactory instance = new JSONMessageFactory();
+
+    protected static final HiveConf hiveConf = new HiveConf();
+    static {
+        hiveConf.addResource("hive-site.xml");
+    }
+
+    private static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl.";
+    private static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format";
+    private static final String HCAT_MESSAGE_FORMAT = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FORMAT, "json");
+    private static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory";
+    private static final String HCAT_MESSAGE_FACTORY_IMPL = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX
+                                                                         + HCAT_MESSAGE_FORMAT,
+                                                                         DEFAULT_MESSAGE_FACTORY_IMPL);
+
+    protected static final String HCAT_SERVER_URL = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.name(), "");
+    protected static final String HCAT_SERVICE_PRINCIPAL = hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.name(), "");
+
+    /**
+     * Getter for MessageFactory instance.
+     */
+    public static MessageFactory getInstance() {
+        if (instance == null) {
+            instance = getInstance(HCAT_MESSAGE_FACTORY_IMPL);
+        }
+        return instance;
+    }
+
+    private static MessageFactory getInstance(String className) {
+        try {
+            return (MessageFactory)ReflectionUtils.newInstance(Class.forName(className), hiveConf);
+        }
+        catch (ClassNotFoundException classNotFound) {
+            throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound);
+        }
+    }
+
+    /**
+     * Getter for MessageDeserializer, corresponding to the specified format and version.
+     * @param format Serialization format for notifications.
+     * @param version Version of serialization format (currently ignored.)
+     * @return MessageDeserializer.
+     */
+    public static MessageDeserializer getDeserializer(String format,
+                                                      String version) {
+        return getInstance(hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + format,
+                                        DEFAULT_MESSAGE_FACTORY_IMPL)).getDeserializer();
+    }
+
+    public abstract MessageDeserializer getDeserializer();
+
+    /**
+     * Getter for version-string, corresponding to all constructed messages.
+     */
+    public abstract String getVersion();
+
+    /**
+     * Getter for message-format.
+     */
+    public abstract String getMessageFormat();
+
+    /**
+     * Factory method for CreateDatabaseMessage.
+     * @param db The Database being added.
+     * @return CreateDatabaseMessage instance.
+     */
+    public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db);
+
+    /**
+     * Factory method for DropDatabaseMessage.
+     * @param db The Database being dropped.
+     * @return DropDatabaseMessage instance.
+     */
+    public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db);
+
+    /**
+     * Factory method for CreateTableMessage.
+     * @param table The Table being created.
+     * @return CreateTableMessage instance.
+     */
+    public abstract CreateTableMessage buildCreateTableMessage(Table table);
+
+    /**
+     * Factory method for DropTableMessage.
+     * @param table The Table being dropped.
+     * @return DropTableMessage instance.
+     */
+    public abstract DropTableMessage buildDropTableMessage(Table table);
+
+    /**
+     * Factory method for AddPartitionMessage.
+     * @param table The Table to which the partition is added.
+     * @param partition The Partition being added.
+     * @return AddPartitionMessage instance.
+     */
+    public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition);
+
+    /**
+     * Factory method for DropPartitionMessage.
+     * @param table The Table from which the partition is dropped.
+     * @param partition The Partition being dropped.
+     * @return DropPartitionMessage instance.
+     */
+    public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition);
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.jms;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.MessageFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+/**
+ * Helper Utility to assist consumers of HCat Messages in extracting
+ * message-content from JMS messages.
+ */
+public class MessagingUtils {
+
+    /**
+     * Method to return HCatEventMessage contained in the JMS message.
+     * @param message The JMS Message instance
+     * @return The contained HCatEventMessage
+     */
+    public static HCatEventMessage getMessage(Message message) {
+        try {
+            String messageBody = ((TextMessage)message).getText();
+            String eventType   = message.getStringProperty(HCatConstants.HCAT_EVENT);
+            String messageVersion = message.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION);
+            String messageFormat = message.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT);
+
+            if (StringUtils.isEmpty(messageBody) || StringUtils.isEmpty(eventType))
+                throw new IllegalArgumentException("Could not extract HCatEventMessage. " +
+                                                   "EventType and/or MessageBody is null/empty.");
+
+            return MessageFactory.getDeserializer(messageFormat, messageVersion).getHCatEventMessage(eventType, messageBody);
+        }
+        catch (JMSException exception) {
+            throw new IllegalArgumentException("Could not extract HCatEventMessage. ", exception);
+        }
+    }
+
+    // Prevent construction.
+    private MessagingUtils() {}
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON implementation of AddPartitionMessage.
+ */
+public class JSONAddPartitionMessage extends AddPartitionMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    @JsonProperty
+    List<Map<String, String>> partitions;
+
+    /**
+     * Default Constructor. Required for Jackson.
+     */
+    public JSONAddPartitionMessage() {}
+
+    public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table,
+                                   List<Map<String,String>> partitions, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.partitions = partitions;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public Long   getTimestamp() { return timestamp; }
+
+    @Override
+    public List<Map<String, String>> getPartitions () { return partitions; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of CreateDatabaseMessage.
+ */
+public class JSONCreateDatabaseMessage extends CreateDatabaseMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, required for Jackson.
+     */
+    public JSONCreateDatabaseMessage() {}
+
+    public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of CreateTableMessage.
+ */
+public class JSONCreateTableMessage extends CreateTableMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, needed for Jackson.
+     */
+    public JSONCreateTableMessage() {}
+
+    public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of DropDatabaseMessage.
+ */
+public class JSONDropDatabaseMessage extends DropDatabaseMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, required for Jackson.
+     */
+    public JSONDropDatabaseMessage() {}
+
+    public JSONDropDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON implementation of DropPartitionMessage.
+ */
+public class JSONDropPartitionMessage extends DropPartitionMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    @JsonProperty
+    List<Map<String, String>> partitions;
+
+    /**
+     * Default Constructor. Required for Jackson.
+     */
+    public JSONDropPartitionMessage() {}
+
+    public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table,
+                                   List<Map<String,String>> partitions, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.partitions = partitions;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public Long   getTimestamp() { return timestamp; }
+
+    @Override
+    public List<Map<String, String>> getPartitions () { return partitions; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of DropTableMessage.
+ */
+public class JSONDropTableMessage extends DropTableMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, needed for Jackson.
+     */
+    public JSONDropTableMessage() {}
+
+    public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.apache.hcatalog.messaging.MessageDeserializer;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * MessageDeserializer implementation, for deserializing from JSON strings.
+ */
+public class JSONMessageDeserializer extends MessageDeserializer {
+
+    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+    static {
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception);
+        }
+    }
+
+    @Override
+    public DropDatabaseMessage getDropDatabaseMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONDropDatabaseMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception);
+        }
+    }
+
+    @Override
+    public CreateTableMessage getCreateTableMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONCreateTableMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception);
+        }
+    }
+
+    @Override
+    public DropTableMessage getDropTableMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONDropTableMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception);
+        }
+    }
+
+    @Override
+    public AddPartitionMessage getAddPartitionMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONAddPartitionMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception);
+        }
+    }
+
+    @Override
+    public DropPartitionMessage getDropPartitionMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONDropPartitionMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java?rev=1431177&view=auto
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (added)
+++ incubator/hcatalog/trunk/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java Thu Jan 10 02:06:07 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.apache.hcatalog.messaging.MessageDeserializer;
+import org.apache.hcatalog.messaging.MessageFactory;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * The JSON implementation of the MessageFactory. Constructs JSON implementations of
+ * each message-type.
+ */
+public class JSONMessageFactory extends MessageFactory {
+
+    private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
+
+    @Override
+    public MessageDeserializer getDeserializer() {
+        return deserializer;
+    }
+
+    @Override
+    public String getVersion() {
+        return "0.1";
+    }
+
+    @Override
+    public String getMessageFormat() {
+        return "json";
+    }
+
+    @Override
+    public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
+        return new JSONCreateDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(),
+                System.currentTimeMillis() / 1000);
+    }
+
+    @Override
+    public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
+        return new JSONDropDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(),
+                System.currentTimeMillis() / 1000);
+    }
+
+    @Override
+    public CreateTableMessage buildCreateTableMessage(Table table) {
+        return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
+                table.getTableName(), System.currentTimeMillis()/1000);
+    }
+
+    @Override
+    public DropTableMessage buildDropTableMessage(Table table) {
+        return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(),
+                System.currentTimeMillis()/1000);
+    }
+
+    @Override
+    public AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition) {
+        return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
+                partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
+                System.currentTimeMillis()/1000);
+    }
+
+    @Override
+    public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) {
+        return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
+                partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
+                System.currentTimeMillis()/1000);
+    }
+
+    private static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
+        Map<String, String> partitionKeys = new LinkedHashMap<String, String>();
+        for (int i=0; i<table.getPartitionKeysSize(); ++i)
+            partitionKeys.put(table.getPartitionKeys().get(i).getName(),
+                    partition.getValues().get(i));
+        return partitionKeys;
+    }
+}

Modified: incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1431177&r1=1431176&r2=1431177&view=diff
==============================================================================
--- incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java (original)
+++ incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java Thu Jan 10 02:06:07 2013
@@ -25,7 +25,7 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
 import javax.jms.Session;
 
 import junit.framework.TestCase;
@@ -36,11 +36,12 @@ import org.apache.hadoop.hive.cli.CliSes
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.jms.MessagingUtils;
 
 public class TestMsgBusConnection extends TestCase {
 
@@ -89,29 +90,28 @@ public class TestMsgBusConnection extend
         try {
             driver.run("create database testconndb");
             Message msg = consumer.receive();
-            assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+            assertTrue("Expected TextMessage", msg instanceof TextMessage);
+            assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT,
                     msg.getStringProperty(HCatConstants.HCAT_EVENT));
             assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
-            assertEquals("testconndb",
-                    ((Database) ((ObjectMessage) msg).getObject()).getName());
+            HCatEventMessage messageObject = MessagingUtils.getMessage(msg);
+            assertEquals("testconndb", messageObject.getDB());
             broker.stop();
             driver.run("drop database testconndb cascade");
             broker.start(true);
             connectClient();
             driver.run("create database testconndb");
             msg = consumer.receive();
-            assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+            assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT,
                     msg.getStringProperty(HCatConstants.HCAT_EVENT));
             assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
-            assertEquals("testconndb",
-                    ((Database) ((ObjectMessage) msg).getObject()).getName());
+            assertEquals("testconndb", messageObject.getDB());
             driver.run("drop database testconndb cascade");
             msg = consumer.receive();
             assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
                     msg.getStringProperty(HCatConstants.HCAT_EVENT));
             assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
-            assertEquals("testconndb",
-                    ((Database) ((ObjectMessage) msg).getObject()).getName());
+            assertEquals("testconndb", messageObject.getDB());
         } catch (NoSuchObjectException nsoe) {
             nsoe.printStackTrace(System.err);
             assert false;



Mime
View raw message