incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1137114 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/listener/ src/test/org/apache/hcatalog/listener/
Date Sat, 18 Jun 2011 02:01:47 GMT
Author: hashutosh
Date: Sat Jun 18 02:01:46 2011
New Revision: 1137114

URL: http://svn.apache.org/viewvc?rev=1137114&view=rev
Log:
HCATALOG-47: Topic prefix for the message bus should be configurable

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Jun 18 02:01:46 2011
@@ -13,6 +13,8 @@ Trunk (unreleased changes)
     
   IMPROVEMENTS
 
+    HCAT-47. Topic prefix for the message bus should be configurable (hashutosh)
+
     HCAT-39. Lazily create connection for Message bus (hashutosh)
 
     HCAT-44. Add a releaseaudit target to build.xml (gates)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Sat Jun
18 02:01:46 2011
@@ -70,9 +70,10 @@ public final class HCatConstants {
   
   public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
   public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy";
+  public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix";
 
   // Message Bus related properties.
-  public static final String HCAT_TOPIC = "HCAT";
+  public static final String HCAT_DEFAULT_TOPIC_PREFIX = "HCAT";
   public static final String HCAT_EVENT = "HCAT_EVENT";
   public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
   public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
Sat Jun 18 02:01:46 2011
@@ -38,6 +38,7 @@ import javax.naming.NamingException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -133,7 +134,7 @@ public class NotificationListener extend
 		// by listening on a topic named "HCAT" and message selector string
 		// as "HCAT_EVENT = HCAT_ADD_DATABASE" 
 		if(dbEvent.getStatus())
-			send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT);
+			send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT);
 	}
 
 	@Override
@@ -142,7 +143,7 @@ public class NotificationListener extend
 		// by listening on a topic named "HCAT" and message selector string
 		// as "HCAT_EVENT = HCAT_DROP_DATABASE" 
 		if(dbEvent.getStatus())
-			send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT);
+			send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT);
 	}
 
 	@Override
@@ -155,23 +156,23 @@ public class NotificationListener extend
 				Table tbl = tableEvent.getTable();
 				Table newTbl = tbl.deepCopy();
 				HMSHandler handler = tableEvent.getHandler();
-				String namingPolicy = handler.getHiveConf().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAMING_POLICY,
"tablename");
-				newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, getTopicNameForParts(namingPolicy,
tbl.getDbName(), tbl.getTableName()));
+				HiveConf conf = handler.getHiveConf();
+				newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, 
+						getTopicPrefix(conf) + "." + tbl.getDbName() +"." + tbl.getTableName());
 				try {
 					handler.alter_table(tbl.getDbName(), tbl.getTableName(), newTbl);
 				} catch (InvalidOperationException e) {
 					throw new MetaException(e.toString());
 				}
-				send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC+"."+tbl.getDbName(), HCatConstants.HCAT_ADD_TABLE_EVENT);
+				send(tableEvent.getTable(),getTopicPrefix(conf)+ "."+ tbl.getDbName(), HCatConstants.HCAT_ADD_TABLE_EVENT);
 			}
 		}	
 	}
 
-	private String getTopicNameForParts(String namingPolicy, String dbName, String tblName){
-		// we only have one policy now, so ignore policy param for now.
-		return HCatConstants.HCAT_TOPIC+"."+dbName+"."+tblName;
+	private String getTopicPrefix(HiveConf conf){
+		return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
 	}
-
+	
 	@Override
 	public void onDropTable(DropTableEvent tableEvent) throws MetaException {
 		// Subscriber can get notification about drop of a  table in HCAT
@@ -188,7 +189,7 @@ public class NotificationListener extend
 			sd.setSortCols(new ArrayList<Order>());
 			sd.setParameters(new HashMap<String, String>());
 			sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-			send(table,HCatConstants.HCAT_TOPIC+"."+table.getDbName(), HCatConstants.HCAT_DROP_TABLE_EVENT);

+			send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName(),
HCatConstants.HCAT_DROP_TABLE_EVENT);	
 		}
 	}
 
@@ -246,7 +247,7 @@ public class NotificationListener extend
 		}
 	}
 
-	private void createConnection(){
+	protected void createConnection(){
 
 		Context jndiCntxt;
 		try {

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
Sat Jun 18 02:01:46 2011
@@ -66,6 +66,7 @@ public class TestMsgBusConnection extend
 		hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
 		hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
 		hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+		hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
 		SessionState.start(new CliSessionState(hiveConf));
 		driver = new Driver(hiveConf);
 	}
@@ -75,7 +76,7 @@ public class TestMsgBusConnection extend
 		Connection conn = connFac.createConnection();
 		conn.start();
 		Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-		Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC);
+		Destination hcatTopic = session.createTopic("planetlab.hcat");
 		consumer = session.createConsumer(hcatTopic);
 	}
 
@@ -85,7 +86,7 @@ public class TestMsgBusConnection extend
 			driver.run("create database testconndb");
 			Message msg = consumer.receive();
 			assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
-			assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+			assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
 			assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
 			broker.stop();
 			driver.run("drop database testconndb cascade");
@@ -94,12 +95,12 @@ public class TestMsgBusConnection extend
 			driver.run("create database testconndb");
 			msg = consumer.receive();
 			assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
-			assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+			assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
 			assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
 			driver.run("drop database testconndb cascade");
 			msg = consumer.receive();
 			assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
-			assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+			assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
 			assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
 		} catch (NoSuchObjectException nsoe){
 			nsoe.printStackTrace(System.err);

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
Sat Jun 18 02:01:46 2011
@@ -68,13 +68,13 @@ public class TestNotificationListener ex
 		// We want message to be sent when session commits, thus we run in
 		// transacted mode.
 		Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-		Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC);
+		Destination hcatTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
 		MessageConsumer consumer1 = session.createConsumer(hcatTopic);
 		consumer1.setMessageListener(this);
-		Destination tblTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb.mytbl");
+		Destination tblTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl");
 		MessageConsumer consumer2 = session.createConsumer(tblTopic);
 		consumer2.setMessageListener(this);
-		Destination dbTopic = session.createTopic(HCatConstants.HCAT_TOPIC+".mydb");
+		Destination dbTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb");
 		MessageConsumer consumer3 = session.createConsumer(dbTopic);
 		consumer3.setMessageListener(this);
 		hiveConf = new HiveConf(this.getClass());
@@ -112,7 +112,7 @@ public class TestNotificationListener ex
 			event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
 			if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
 
-				assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+				assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
 				assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
 			}
 			else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
@@ -153,7 +153,7 @@ public class TestNotificationListener ex
 			}
 			else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
 
-				assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+				assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
 				assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
 			}
 			else



Mime
View raw message