airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From patanac...@apache.org
Subject svn commit: r1175091 - in /incubator/airavata/trunk/modules/ws-messenger: commons/src/main/java/org/apache/airavata/wsmg/commons/config/ messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ messagebroker/src/main/java/org/apache/airavata/wsmg/m...
Date Sat, 24 Sep 2011 01:45:09 GMT
Author: patanachai
Date: Sat Sep 24 01:45:09 2011
New Revision: 1175091

URL: http://svn.apache.org/viewvc?rev=1175091&view=rev
Log:
AIRAVATA-101 Fix build error

Modified:
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/config/ConfigurationManager.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
    incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/config/ConfigurationManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/config/ConfigurationManager.java?rev=1175091&r1=1175090&r2=1175091&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/config/ConfigurationManager.java
(original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/config/ConfigurationManager.java
Sat Sep 24 01:45:09 2011
@@ -46,4 +46,8 @@ public class ConfigurationManager {
     public int getConfig(String configName, int defaultVal) {
         return Integer.parseInt(configurations.getProperty(configName, Integer.toString(defaultVal)));
     }
+    
+    public long getConfig(String configName, long defaultVal) {
+        return Long.parseLong(configurations.getProperty(configName, Long.toString(defaultVal)));
+    }
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1175091&r1=1175090&r2=1175091&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
(original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
Sat Sep 24 01:45:09 2011
@@ -205,7 +205,7 @@ public class BrokerServiceLifeCycle impl
         Deliverable senderUtils = new SenderUtils(urlManager);
         senderUtils.setProtocol(protocol);
         
-        DeliveryProcessor proc = new DeliveryProcessor(senderUtils, method);
+        proc = new DeliveryProcessor(senderUtils, method);
         proc.start();
         log.info(initedmethod + " sending method inited");
     }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java?rev=1175091&r1=1175090&r2=1175091&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
(original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
Sat Sep 24 01:45:09 2011
@@ -37,34 +37,30 @@ import org.apache.axis2.addressing.Endpo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/*
- * FIXME: need thread safe version
- */
 public class ConsumerUrlManager {
-    
+
     private static final Logger logger = LoggerFactory.getLogger(ConsumerUrlManager.class);
 
-    private ConcurrentHashMap<String, FailedConsumerInfo> failedConsumerUrls = new
ConcurrentHashMap<String, FailedConsumerInfo>(); // the
+    private ConcurrentHashMap<String, FailedConsumerInfo> failedConsumerUrls = new
ConcurrentHashMap<String, FailedConsumerInfo>();
 
     private final int defaultMaxRetry;
 
-    private long expireTimeGap; // milliseconds    
+    private long expireTimeGap; // milliseconds
 
     private Timer cleanupTimer;
 
     public ConsumerUrlManager(ConfigurationManager config) {
 
-        defaultMaxRetry = Integer.parseInt(config
-                .getConfig(WsmgCommonConstants.CONFIG_MAX_MESSAGE_DELIVER_RETRIES, "2"));
+        defaultMaxRetry = config.getConfig(WsmgCommonConstants.CONFIG_MAX_MESSAGE_DELIVER_RETRIES,
2);
 
-        expireTimeGap = 1000 * 60 * Long.parseLong(config.getConfig(
-                WsmgCommonConstants.CONFIG_CONSUMER_URL_EXPIRATION_TIME_GAP, "5")); // time
is in milliseconds
+        // time is in milliseconds
+        expireTimeGap = 1000 * 60 * config.getConfig(WsmgCommonConstants.CONFIG_CONSUMER_URL_EXPIRATION_TIME_GAP,
5l);
 
         // let minimum time to be 1 minute
         long timerThreadInterval = Math.max(expireTimeGap / 5, 1000 * 60);
 
         cleanupTimer = new Timer("Failed consumer url handler", true);
-        cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(failedConsumerUrls), 0, timerThreadInterval);
+        cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(), 0, timerThreadInterval);
 
     }
 
@@ -77,14 +73,15 @@ public class ConsumerUrlManager {
 
         if (isEligibleToBlackList(exception)) {
 
-            FailedConsumerInfo info = failedConsumerUrls.get(url);
-            if (info == null) {
-                info = new FailedConsumerInfo();
-                failedConsumerUrls.put(url, info);
+            synchronized (failedConsumerUrls) {
+                FailedConsumerInfo info = failedConsumerUrls.get(url);
+                if (info == null) {
+                    info = new FailedConsumerInfo();
+                    failedConsumerUrls.put(url, info);
+                }
+                info.incrementNumberOfTimesTried(timeFinished + expireTimeGap);
             }
 
-            info.incrementNumberOfTimesTried(timeFinished + expireTimeGap);
-
         } else {
 
             String errorMsg = String.format("unable to deliver message: [%s] to consumer:
[%s], " + "reason: [%s]",
@@ -97,20 +94,22 @@ public class ConsumerUrlManager {
     public void onSucessfullDelivery(EndpointReference consumerEndpointReference, long timeTaken)
{
 
         RunTimeStatistics.addNewSuccessfulDeliverTime(timeTaken);
+        synchronized (failedConsumerUrls) {
 
-       FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
+            FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
 
-        if (info != null) {
-            logger.debug(String.format("message was delivered to " + "previously %d times
failed url : %s",
-                    info.getNumberOfTimesTried(), consumerEndpointReference.getAddress()));
+            if (info != null) {
+                logger.debug(String.format("message was delivered to " + "previously %d times
failed url : %s",
+                        info.getNumberOfTimesTried(), consumerEndpointReference.getAddress()));
+            }
         }
     }
 
     public boolean isUnavailable(String url) {
-
-        FailedConsumerInfo info = failedConsumerUrls.get(url);
-
-        return (info != null && info.isMaxRetryCountReached());
+        synchronized (failedConsumerUrls) {
+            FailedConsumerInfo info = failedConsumerUrls.get(url);
+            return (info != null && info.isMaxRetryCountReached());
+        }
     }
 
     private boolean isEligibleToBlackList(AxisFault f) {
@@ -123,8 +122,8 @@ public class ConsumerUrlManager {
         }
 
         /*
-         * if timeout because of the set timeout in this class In windows, timeout cause
ConnectException with
-         * "Connection timed out" message
+         * if timeout because of the set timeout in this class In windows,
+         * timeout cause ConnectException with "Connection timed out" message
          */
         if (cause instanceof SocketTimeoutException || cause.getMessage().indexOf("timed
out") > 0
                 || cause instanceof NoRouteToHostException) {
@@ -139,15 +138,9 @@ public class ConsumerUrlManager {
         private int numberOfTimesTried;
         private long expiryTime;
 
-        public FailedConsumerInfo() {
-            numberOfTimesTried = 0;
-            expiryTime = 0L;
-        }
-
         public void incrementNumberOfTimesTried(long expireTime) {
             numberOfTimesTried++;
             expiryTime = expireTime;
-
         }
 
         public void decrementNumberOfTimeTried() {
@@ -170,35 +163,26 @@ public class ConsumerUrlManager {
 
     class URLCleanUpTask extends TimerTask {
 
-        ConcurrentHashMap<String, FailedConsumerInfo> failedConsumers;
-
-        public URLCleanUpTask(ConcurrentHashMap<String, FailedConsumerInfo> failedUrls)
{
-
-            failedConsumers = failedUrls;
-
-        }
-
         @Override
         public void run() {
 
             logger.info("starting to clean up black listed consumer urls");
             long currentTime = System.currentTimeMillis();
 
-            for (Entry<String, FailedConsumerInfo> entry : failedConsumers.entrySet())
{
-                FailedConsumerInfo info = entry.getValue();
+            synchronized (failedConsumerUrls) {
+                for (Entry<String, FailedConsumerInfo> entry : failedConsumerUrls.entrySet())
{
+                    FailedConsumerInfo info = entry.getValue();
 
-                if (info.isMaxRetryCountReached() && info.getLastAtteptExpiryTime()
>= currentTime) {
+                    if (info.isMaxRetryCountReached() && info.getLastAtteptExpiryTime()
>= currentTime) {
 
-                    info.decrementNumberOfTimeTried();
-                    logger.info("decrementing number of times" + " tried for consumer url:
" + entry.getKey());
+                        info.decrementNumberOfTimeTried();
+                        logger.info("decrementing number of times" + " tried for consumer
url: " + entry.getKey());
 
+                    }
                 }
-
             }
 
             logger.info("finished cleaning black listed consumer urls");
         }
-
     }
-
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java?rev=1175091&r1=1175090&r2=1175091&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
(original)
+++ incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java
Sat Sep 24 01:45:09 2011
@@ -23,6 +23,7 @@ package org.apache.airavata.wsmg.messeng
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -35,6 +36,8 @@ import org.apache.airavata.wsmg.commons.
 import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
 import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
 import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
 import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
 import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
 import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
@@ -49,10 +52,9 @@ public class MessengerServlet extends Ht
 
     private static final long serialVersionUID = -7175511030332798604L;
 
-    private SendingStrategy sender = null;
+    private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
 
-    public MessengerServlet() {
-    }
+    private DeliveryProcessor proc;
 
     public void init(ServletConfig config) throws ServletException {
         logger.info("Starting messenger servlet");
@@ -65,38 +67,65 @@ public class MessengerServlet extends Ht
 
     }
 
-    private void initDiliveryMethod(ConfigurationManager config) {
-        logger.info("starting dilivery thread");
+    private void initDiliveryMethod(ConfigurationManager configMan) {
+        /*
+         * Create Protocol
+         */
+        DeliveryProtocol protocol;
+        String protocolClass = configMan
+                .getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL, Axis2Protocol.class.getName());
+        try {
+            Class cl = Class.forName(protocolClass);
+            Constructor<DeliveryProtocol> co = cl.getConstructor(null);
+            protocol = co.newInstance((Object[]) null);
+
+        } catch (Exception e) {
+            logger.error("Cannot initial protocol sender", e);
+            return;
+        }
+        protocol.setTimeout(configMan.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT,
DEFAULT_SOCKET_TIME_OUT));
 
-        String deliveryMethod = config.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD,
-                WsmgCommonConstants.DELIVERY_METHOD_SERIAL);
+        /*
+         * Create delivery method
+         */
         SendingStrategy method = null;
-
-        ConsumerUrlManager urlManager = new ConsumerUrlManager(config);
-
         String initedmethod = null;
-
+        String deliveryMethod = configMan.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD,
+                WsmgCommonConstants.DELIVERY_METHOD_SERIAL);
         if (WsmgCommonConstants.DELIVERY_METHOD_PARALLEL.equalsIgnoreCase(deliveryMethod))
{
-
-            method = new ParallelSender(config, urlManager);
+            method = new ParallelSender();
             initedmethod = WsmgCommonConstants.DELIVERY_METHOD_PARALLEL;
 
         } else if (WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW.equalsIgnoreCase(deliveryMethod))
{
-            method = new FixedParallelSender(config, urlManager);
+            int poolsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
+                    WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
+            int batchsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
+                    WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
+            method = new FixedParallelSender(poolsize, batchsize);
             initedmethod = WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW;
 
         } else {
-            method = new SerialSender(config, urlManager);
+            method = new SerialSender();
             initedmethod = WsmgCommonConstants.DELIVERY_METHOD_SERIAL;
         }
 
-        method.start();
+        /*
+         * Create Deliverable
+         */
+        ConsumerUrlManager urlManager = new ConsumerUrlManager(configMan);
+        Deliverable senderUtils = new SenderUtils(urlManager);
+        senderUtils.setProtocol(protocol);
+
+        proc = new DeliveryProcessor(senderUtils, method);
+        proc.start();
         logger.info(initedmethod + " sending method inited");
     }
 
     public void destroy() {
-        sender.shutdown();
         logger.info("stoping wsmg-messenger");
+        if (proc != null) {
+            proc.stop();
+        }
     }
 
     public ServletConfig getServletConfig() {



Mime
View raw message