incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avand...@apache.org
Subject svn commit: r1408431 - in /incubator/hcatalog/branches/branch-0.4: CHANGES.txt src/java/org/apache/hcatalog/listener/NotificationListener.java
Date Mon, 12 Nov 2012 20:01:33 GMT
Author: avandana
Date: Mon Nov 12 20:01:32 2012
New Revision: 1408431

URL: http://svn.apache.org/viewvc?rev=1408431&view=rev
Log:
HCAT-548 Move topic creation in NotificationListener to a separate method 

Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1408431&r1=1408430&r2=1408431&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Mon Nov 12 20:01:32 2012
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-548 Move topic creation in NotificationListener to a separate method (amalakar via
avandana)
+
   HCAT-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition
is 300 (amalakar via toffer)
 
   HCAT-492 Document CTAS workaround for Hive with JSON serde (lefty via khorgath)

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1408431&r1=1408430&r2=1408431&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java
(original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java
Mon Nov 12 20:01:32 2012
@@ -34,6 +34,7 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
@@ -195,7 +196,7 @@ public class NotificationListener extend
 				 me.initCause(e);
 				throw me;
 			}
-			send(newTbl,getTopicPrefix(conf)+ "."+ newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT);
+			send(newTbl, getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT);
 		}
 	}
 	
@@ -236,7 +237,6 @@ public class NotificationListener extend
 
 		try{
 
-			Destination topic = null;
 			if(null == session){
 				// this will happen, if we never able to establish a connection.
 				createConnection();
@@ -247,24 +247,16 @@ public class NotificationListener extend
 					return;
 				}
 			}
-			try{
-				// Topics are created on demand. If it doesn't exist on broker it will
-				// be created when broker receives this message.
-				topic = session.createTopic(topicName);				
-			} catch (IllegalStateException ise){
-				// this will happen if we were able to establish connection once, but its no longer valid,
-				// ise is thrown, catch it and retry.
-				LOG.error("Seems like connection is lost. Retrying", ise);
-				createConnection();
-				topic = session.createTopic(topicName);				
-			}
-			if (null == topic){
-				// Still not successful, return from here.
-				LOG.error("Invalid session. Failed to send message on topic: "+
-						topicName + " event: "+event);				
-				return;
-			}
-			MessageProducer producer = session.createProducer(topic);
+            Destination topic = null;
+            topic = getTopic(topicName);
+            if (null == topic){
+                // Still not successful, return from here.
+                LOG.error("Invalid session. Failed to send message on topic: "+
+                        topicName + " event: "+ event);
+                return;
+            }
+
+            MessageProducer producer = session.createProducer(topic);
 			Message msg;
 			if (msgBody instanceof Map){
 				MapMessage mapMsg = session.createMapMessage();
@@ -289,6 +281,29 @@ public class NotificationListener extend
 		}
 	}
 
+    /**
+     * Get the topic object for the topicName, it also tries to reconnect
+     * if the connection appears to be broken.
+     * @param topicName
+     * @return
+     * @throws JMSException
+     */
+    protected Topic getTopic(final String topicName) throws JMSException {
+        Topic topic;
+        try{
+            // Topics are created on demand. If it doesn't exist on broker it will
+            // be created when broker receives this message.
+            topic = session.createTopic(topicName);
+        } catch (IllegalStateException ise){
+            // this will happen if we were able to establish connection once, but its no
longer valid,
+            // ise is thrown, catch it and retry.
+            LOG.error("Seems like connection is lost. Retrying", ise);
+            createConnection();
+            topic = session.createTopic(topicName);
+        }
+        return topic;
+    }
+
 	protected void createConnection(){
 
 		Context jndiCntxt;



Mime
View raw message