airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From patanac...@apache.org
Subject svn commit: r1173508 [1/2] - in /incubator/airavata/trunk/modules/ws-messenger: commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ commons/src/main/java/org/apache/airavata/wsmg/commons/util/ messagebox/src/main/org/apache/airavata/wsmg/ms...
Date Wed, 21 Sep 2011 06:20:39 GMT
Author: patanachai
Date: Wed Sep 21 06:20:38 2011
New Revision: 1173508

URL: http://svn.apache.org/viewvc?rev=1173508&view=rev
Log:
AIRAVATA-101, AIRAVATA-107 Improve code in messagebox modules and remove log4j

Added:
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/util/
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/util/Axis2Utils.java
Removed:
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/ProcessingContext.java
Modified:
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceLifeCycle.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceMessageReceiverInOut.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/resources/services.xml
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/resources/services.xml
    incubator/airavata/trunk/modules/ws-messenger/messenger/src/main/java/org/apache/airavata/wsmg/messenger/MessengerServlet.java

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ConnectionPool.java Wed Sep 21 06:20:38 2011
@@ -301,6 +301,13 @@ public class ConnectionPool {
         closeConnections(busyConnections);
         busyConnections = new Stack<Connection>();
         lastAccessTimeRecord.clear();
+        
+        try{
+            this.clenupThread.join();
+            this.producerThread.join();
+        }catch(Exception e){
+            logger.error("Cannot shutdown cleanup thread", e);
+        }
     }
 
     private void closeConnections(Stack<Connection> connections) {

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java Wed Sep 21 06:20:38 2011
@@ -107,10 +107,6 @@ public class JdbcStorage {
         return conn;
     }
 
-    public void closeConnection(Connection conn) {
-        connectionPool.free(conn);
-    }
-
     public int update(String query) throws SQLException {
         int result = 0;
         Connection conn = null;
@@ -124,11 +120,14 @@ public class JdbcStorage {
             rollback(conn);
             throw sql;
         } finally {
-            if (stmt != null && !stmt.isClosed()) {
-                stmt.close();
-            }            
-            if(conn!=null){
-                closeConnection(conn);
+            try {
+                if (stmt != null && !stmt.isClosed()) {
+                    stmt.close();
+                }
+            } finally {
+                if (conn != null) {
+                    closeConnection(conn);
+                }
             }
         }
         return result;
@@ -165,44 +164,30 @@ public class JdbcStorage {
             ResultSet rs = stmt.executeQuery();
             rs.next();
             count = rs.getInt(1);
+            commit(conn);
+        } catch (SQLException sql) {
+            rollback(conn);
+            throw sql;
         } finally {
-            if (stmt != null && !stmt.isClosed()) {
-                stmt.close();
-            }
-            if (conn != null) {
-                connectionPool.free(conn);
+            try {
+                if (stmt != null && !stmt.isClosed()) {
+                    stmt.close();
+                }
+            } finally {
+                if (conn != null) {
+                    closeConnection(conn);
+                }
             }
         }
         return count;
     }
 
-    /**
-     * @param query
-     * @return
-     * @throws SQLException
-     */
-    public int insert(String query) throws SQLException {
-        int rows = 0;
-        Connection conn = null;
-        PreparedStatement stmt = null;
-        try {
-            conn = connectionPool.getConnection();
-            stmt = conn.prepareStatement(query);
-            rows = stmt.executeUpdate();
-        } finally {
-            if (stmt != null && !stmt.isClosed()) {
-                stmt.close();
-            }
-
-            if (conn != null) {
-                connectionPool.free(conn);
-            }
-        }
-        return rows;
-    }
-
+    public void closeConnection(Connection conn) {
+        connectionPool.free(conn);
+    }    
+    
     public void closeAllConnections() {
         if (connectionPool != null)
             connectionPool.dispose();
-    }    
+    }
 }

Added: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/util/Axis2Utils.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/util/Axis2Utils.java?rev=1173508&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/util/Axis2Utils.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/util/Axis2Utils.java Wed Sep 21 06:20:38 2011
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.util;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
+import org.apache.axis2.engine.Handler;
+import org.apache.axis2.engine.Phase;
+
+public class Axis2Utils {
+
+    public static void overrideAddressingPhaseHander(ConfigurationContext configContext, AddressingBasedDispatcher dispatcher) {
+
+        List<Phase> inflowPhases = configContext.getAxisConfiguration().getPhasesInfo().getINPhases();
+        boolean foundFlag = false;
+
+        for (Phase p : inflowPhases) {
+
+            if (p.getName().equalsIgnoreCase("Addressing")) {
+
+                List<Handler> handlers = p.getHandlers();
+
+                for (Iterator<Handler> ite = handlers.iterator(); ite.hasNext();) {
+                    Handler h = ite.next();
+                    if (h.getClass().isAssignableFrom(dispatcher.getClass())) {
+                        p.removeHandler(h.getHandlerDesc());
+                        break;
+                    }
+                }
+
+                p.addHandler(dispatcher, 0);
+                foundFlag = true;
+                break;
+            }
+
+        }
+
+        if (!foundFlag) {
+            throw new RuntimeException("unable to find addressing phase - inside inflow phases");
+        }
+    }
+}

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceLifeCycle.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceLifeCycle.java Wed Sep 21 06:20:38 2011
@@ -22,25 +22,17 @@
 package org.apache.airavata.wsmg.msgbox;
 
 import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.commons.storage.DatabaseCreator;
-import org.apache.airavata.wsmg.commons.storage.JdbcStorage;
+import org.apache.airavata.wsmg.commons.util.Axis2Utils;
+import org.apache.airavata.wsmg.msgbox.Storage.MsgBoxStorage;
 import org.apache.airavata.wsmg.msgbox.Storage.dbpool.DatabaseStorageImpl;
 import org.apache.airavata.wsmg.msgbox.Storage.memory.InMemoryImpl;
 import org.apache.airavata.wsmg.msgbox.util.ConfigKeys;
 import org.apache.airavata.wsmg.msgbox.util.MsgBoxCommonConstants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.AxisService;
-import org.apache.axis2.engine.Handler;
-import org.apache.axis2.engine.Phase;
+import org.apache.axis2.engine.ServiceLifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,124 +40,46 @@ import org.slf4j.LoggerFactory;
  * This class initialize the messageBox service by setting the messageStore
  * based on the configuration done by the user This is the LifeCycle class
  */
-public class MsgBoxServiceLifeCycle implements org.apache.axis2.engine.ServiceLifeCycle {
+public class MsgBoxServiceLifeCycle implements ServiceLifeCycle {
 
-    
     private static final Logger logger = LoggerFactory.getLogger(MsgBoxServiceLifeCycle.class);
     private static final String CONFIGURATION_FILE_NAME = "msgBox.properties";
-    private static final String TABLE_NAME_TO_CHECK = "msgbox";
-    private JdbcStorage db;
+    private static final String TRUE = Boolean.toString(true);
 
     public void shutDown(ConfigurationContext configurationcontext, AxisService axisservice) {
         logger.info("Message box shutting down");
-        if (db != null)
-            db.closeAllConnections();
+        if (configurationcontext.getProperty(MsgBoxCommonConstants.MSGBOX_STORAGE) != null) {
+            MsgBoxStorage msgBoxStorage = (MsgBoxStorage) configurationcontext
+                    .getProperty(MsgBoxCommonConstants.MSGBOX_STORAGE);
+            msgBoxStorage.dispose();
+        }
     }
 
     public void startUp(ConfigurationContext configurationcontext, AxisService axisservice) {
 
-        overrideAddressingPhaseHander(configurationcontext);
+        Axis2Utils.overrideAddressingPhaseHander(configurationcontext, new StoreMessageHandler());
 
         // Load the configuration file from the classpath
         ConfigurationManager confmanager = new ConfigurationManager("conf" + File.separator + CONFIGURATION_FILE_NAME);
-        configurationcontext.setProperty(MsgBoxCommonConstants.CONF_MANAGER, confmanager);
         initDatabase(configurationcontext, confmanager);
-        configurationcontext.setProperty(MsgBoxCommonConstants.INIT_MSG_BOX_SKELETON_TRUE, false);
     }
 
     public void initDatabase(ConfigurationContext configurationcontext, ConfigurationManager confmanager) {
-
-        boolean dbImplemented = true;
-        if (confmanager.getConfig(ConfigKeys.USE_DATABSE_STORAGE).equalsIgnoreCase("true")) {
-            if (!checkConnection(confmanager)) {
-                logger.error("Database creation failure at MsgBoxServiceLifeCycle class. Cannot connect with the database");
-                throw new RuntimeException("Database failure");
-            }
-            
+        /*
+         * Determine Storage
+         */
+        String useDatabase = confmanager.getConfig(ConfigKeys.USE_DATABASE_STORAGE, TRUE);
+        MsgBoxStorage msgBoxStorage = null;
+        long time = getInterval(confmanager);
+        if (useDatabase.equalsIgnoreCase(TRUE)) {
             String jdbcUrl = confmanager.getConfig(ConfigKeys.MSG_BOX_JDBC_URL);
-            String jdbcDriver = confmanager.getConfig(ConfigKeys.JDBC_DRIVER);
-            db = new JdbcStorage(10, 50, jdbcUrl, jdbcDriver, true);
-            try {
-                /*
-                 * Check database
-                 */
-                Connection conn = db.connect();
-                if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
-                    DatabaseCreator.createMsgBoxDatabase(conn);
-                    logger.info("New Database created for Message Box");
-                } else {
-                    logger.info("Database already created for Message Box!");
-                }
-                db.closeConnection(conn);
-                
-                
-                /*
-                 * This fails if the table: msgBoxes is not there in the
-                 * database
-                 */
-                MsgBoxServiceSkeleton.setStorage(new DatabaseStorageImpl(db, getInterval(confmanager)));
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-                throw new RuntimeException("Database failure");
-            }
-        }
-        if (confmanager.getConfig(ConfigKeys.USE_DATABSE_STORAGE).equalsIgnoreCase("false")) {
-            ConcurrentHashMap<String, LinkedList<String>> map = new ConcurrentHashMap<String, LinkedList<String>>();
-            InMemoryImpl tempStor = new InMemoryImpl();
-            tempStor.setMap(map);
-            MsgBoxServiceSkeleton.setStorage(tempStor);
-            dbImplemented = false;
-        }
-
-        configurationcontext.setProperty(MsgBoxCommonConstants.DB_IMPLEMENTED_TRUE, dbImplemented);
-    }
-
-    public boolean checkConnection(ConfigurationManager confmanager) {
-        boolean dbexists = true;
-        Connection conn;
-        try {
-            Class.forName(confmanager.getConfig(ConfigKeys.JDBC_DRIVER)).newInstance();
-            conn = DriverManager.getConnection(confmanager.getConfig(ConfigKeys.MSG_BOX_JDBC_URL));
-            try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error("Database connect is not closed at the test", e);
-            }
-        } catch (Exception e) {
-            logger.error("Checked for database connection with provided info. Failed connection", e);
-            dbexists = false;
-        }
-        return dbexists;
-    }
-
-    private void overrideAddressingPhaseHander(ConfigurationContext configContext) {
-        List<Phase> inflowPhases = configContext.getAxisConfiguration().getPhasesInfo().getINPhases();
-        boolean foundFlag = false;
-
-        for (Phase p : inflowPhases) {
-
-            if (p.getName().equalsIgnoreCase("Addressing")) {
-
-                List<Handler> handlers = p.getHandlers();
-
-                for (Iterator<Handler> ite = handlers.iterator(); ite.hasNext();) {
-                    Handler h = ite.next();
-                    if (h.getClass().isAssignableFrom(StoreMessageHandler.class)) {
-                        p.removeHandler(h.getHandlerDesc());
-                        break;
-                    }
-                }
-
-                p.addHandler(new StoreMessageHandler(), 0);
-                foundFlag = true;
-                break;
-            }
-
+            String jdbcDriver = confmanager.getConfig(ConfigKeys.MSG_BOX_JDBC_DRIVER);
+            msgBoxStorage = new DatabaseStorageImpl(jdbcUrl, jdbcDriver, time);
+        } else {
+            msgBoxStorage = new InMemoryImpl(time);
         }
+        configurationcontext.setProperty(MsgBoxCommonConstants.MSGBOX_STORAGE, msgBoxStorage);
 
-        if (!foundFlag) {
-            throw new RuntimeException("unable to find addressing phase - inside inflow phases");
-        }
     }
 
     private long getInterval(ConfigurationManager configs) {

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceMessageReceiverInOut.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceMessageReceiverInOut.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceMessageReceiverInOut.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceMessageReceiverInOut.java Wed Sep 21 06:20:38 2011
@@ -27,128 +27,112 @@ import org.apache.axiom.soap.SOAPEnvelop
 import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
-import org.apache.log4j.Logger;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.receivers.AbstractInOutMessageReceiver;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * MsgBoxServiceMessageReceiverInOut message receiver, this is the actual location where the service operations get invoked.
+ * MsgBoxServiceMessageReceiverInOut message receiver, this is the actual
+ * location where the service operations get invoked.
  */
 
-public class MsgBoxServiceMessageReceiverInOut extends org.apache.axis2.receivers.AbstractInOutMessageReceiver {
+public class MsgBoxServiceMessageReceiverInOut extends AbstractInOutMessageReceiver {
 
-    private static org.apache.log4j.Logger logger = Logger.getLogger(MsgBoxServiceMessageReceiverInOut.class);
+    private static Logger logger = LoggerFactory.getLogger(MsgBoxServiceMessageReceiverInOut.class);
 
-    public void invokeBusinessLogic(org.apache.axis2.context.MessageContext msgContext,
-            org.apache.axis2.context.MessageContext newMsgContext) throws org.apache.axis2.AxisFault {
+    public void invokeBusinessLogic(MessageContext inMsgContext, MessageContext outMsgContext) throws AxisFault {
 
         // get the implementation class for the Web Service
-        Object obj = getTheImplementationObject(msgContext);
-
-        MsgBoxServiceSkeleton skel = (MsgBoxServiceSkeleton) obj;
-
-        // Out Envelop
-        org.apache.axiom.soap.SOAPEnvelope envelope = null;
-        // Find the axisOperation that has been set by the Dispatch phase.
-        org.apache.axis2.description.AxisOperation op = msgContext.getOperationContext().getAxisOperation();
-        if (op == null) {
-            throw new org.apache.axis2.AxisFault(
-                    "Operation is not located, if this is doclit style the SOAP-ACTION should specified via the SOAP Action to use the RawXMLProvider");
-        }
-
-        String operationName = getOperationName(msgContext);
-        MsgBoxOperations msgType = MsgBoxOperations.valueFrom(operationName);
-        SOAPEnvelope enlp = msgContext.getEnvelope();
-        // set messageboxId
-        String clientId = null;
-        String toAddress = msgContext.getTo().getAddress();
-        int biginIndex = toAddress.indexOf("clientid");
-        clientId = toAddress.substring(biginIndex + "clientid".length() + 1);
+        MsgBoxServiceSkeleton skel = (MsgBoxServiceSkeleton)getTheImplementationObject(inMsgContext);        
+        
+        OMElement response = null;
+        
         try {
-            ProcessingContext procCtxt = new ProcessingContext();
-
+            
+            String operationName = getOperationName(inMsgContext);
+            MsgBoxOperations msgType = MsgBoxOperations.valueFrom(operationName);
+            
             switch (msgType) {
 
             case STORE_MSGS: {
-                if (biginIndex != -1) {
-
-                    procCtxt.setMessage(enlp.getBody().getFirstElement());
-                    procCtxt.setMsgBoxId(clientId);
-                    procCtxt.setMessageId(msgContext.getMessageID());
-                    procCtxt.setSoapAction(msgContext.getSoapAction());
-                    OMElement response = skel.storeMessages(procCtxt);
-                    envelope = toEnvelope(getSOAPFactory(msgContext), response);
-                }
-
-                else {
-                    throw new AxisFault("clientid cannot be found");
-                }
-
-            }
+                SOAPEnvelope enlp = inMsgContext.getEnvelope();
+                OMElement message = enlp.getBody().getFirstElement();
+                String msgBoxId = getClientId(inMsgContext);
+                String messageId = inMsgContext.getMessageID();
+                String soapAction = inMsgContext.getSoapAction();
+                response = skel.storeMessages(msgBoxId, messageId, soapAction, message);
                 break;
+            }
 
             case DESTROY_MSGBOX: {
-                if (biginIndex != -1) {
-                    procCtxt.setMsgBoxId(clientId);
-                    envelope = toEnvelope(getSOAPFactory(msgContext), skel.destroyMsgBox(procCtxt));
-                } else {
-                    throw new AxisFault("clientid cannot be found");
-                }
-            }
+                String msgBoxId = getClientId(inMsgContext);
+                response = skel.destroyMsgBox(msgBoxId);
                 break;
+            }
 
             case TAKE_MSGS: {
-                if (biginIndex != -1) {
-                    procCtxt.setMsgBoxId(clientId);
-                    OMElement respEl = skel.takeMessages(procCtxt);
-                    envelope = toEnvelope(getSOAPFactory(msgContext), respEl);
-                } else {
-                    throw new AxisFault("clientid cannot be found");
-                }
-            }
+                String msgBoxId = getClientId(inMsgContext);
+                response = skel.takeMessages(msgBoxId);
                 break;
+            }
 
             case CREATE_MSGBOX: {
-                // procCtxt.setMsgBoxId(clientId);
-                OMElement response = skel.createMsgBox();
-                envelope = toEnvelope(getSOAPFactory(msgContext), response);
-
-            }
+                response = skel.createMsgBox();
                 break;
-
+            }
+            default:
+                throw new AxisFault("unsupported operation" + msgType.toString());
             }
 
+        } catch (AxisFault afe) {
+            throw afe;
         } catch (Exception e) {
-            logger.fatal("Exception", e);
+            logger.error("Exception", e);
             throw new AxisFault("Exception in Message Box ", e);
         }
 
-        newMsgContext.setEnvelope(envelope);
-        newMsgContext.getOptions().setProperty(org.apache.axis2.transport.http.HTTPConstants.CHUNKED, Boolean.FALSE);
+        /*
+         * Output
+         */
+        SOAPFactory soapFactory = getSOAPFactory(inMsgContext);
+        SOAPEnvelope envelope = toEnvelope(soapFactory, response);
+        outMsgContext.setEnvelope(envelope);
+        outMsgContext.getOptions().setProperty(HTTPConstants.CHUNKED, Boolean.FALSE);
     }
 
-    //
-
-    private org.apache.axiom.soap.SOAPEnvelope toEnvelope(SOAPFactory factory, OMElement response) {
-
-        org.apache.axiom.soap.SOAPEnvelope emptyEnvelope = factory.getDefaultEnvelope();
-        emptyEnvelope.getBody().addChild(response);
-        return emptyEnvelope;
+    private String getClientId(MessageContext inMsg) throws AxisFault{        
+        String toAddress = inMsg.getTo().getAddress();
+        int biginIndex = toAddress.indexOf("clientid");
+        if (biginIndex == -1) {
+            throw new AxisFault("clientid cannot be found");
+        }
+        String clientId = toAddress.substring(biginIndex + "clientid".length() + 1);
+        return clientId;
+    }
+    
+    private SOAPEnvelope toEnvelope(SOAPFactory factory, OMElement response) {
+        SOAPEnvelope envelop = factory.getDefaultEnvelope();
+        envelop.getBody().addChild(response);
+        return envelop;
     }
 
     protected String getOperationName(MessageContext inMsg) throws AxisFault {
 
-        org.apache.axis2.description.AxisOperation op = inMsg.getOperationContext().getAxisOperation();
+        AxisOperation op = inMsg.getOperationContext().getAxisOperation();
         if (op == null) {
             throw new AxisFault(
                     "Operation is not located, if this is doclit style the SOAP-ACTION should specified via the SOAP Action to use the RawXMLProvider");
         }
 
-        java.lang.String operationName = null;
-        if ((op.getName() == null)
-                || ((operationName = org.apache.axis2.util.JavaUtils.xmlNameToJava(op.getName().getLocalPart())) == null)) {
+        String operationName = null;
+        if ((op.getName() == null) || ((operationName = JavaUtils.xmlNameToJava(op.getName().getLocalPart())) == null)) {
             throw new AxisFault("invalid operation found");
         }
 
         return operationName;
     }
 
-}// end of class
+}

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java Wed Sep 21 06:20:38 2011
@@ -23,155 +23,154 @@ package org.apache.airavata.wsmg.msgbox;
 
 import java.io.StringReader;
 import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.xml.namespace.QName;
+import java.util.List;
 
 import org.apache.airavata.wsmg.commons.MsgBoxNameSpConsts;
 import org.apache.airavata.wsmg.msgbox.Storage.MsgBoxStorage;
-import org.apache.airavata.wsmg.msgbox.Storage.memory.InMemoryImpl;
+import org.apache.airavata.wsmg.msgbox.util.MsgBoxCommonConstants;
 import org.apache.airavata.wsmg.msgbox.util.MsgBoxUtils;
 import org.apache.axiom.om.OMAbstractFactory;
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ServiceContext;
 import org.apache.axis2.service.Lifecycle;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Service class for MsgBoxService this get calle by MsgBoxServiceMessageReceiverInOut with ProcessingContext
+ * Service class for MsgBoxService this get called by
+ * MsgBoxServiceMessageReceiverInOut with ProcessingContext
  */
 public class MsgBoxServiceSkeleton implements Lifecycle {
 
-    public MsgBoxServiceSkeleton() {
-        dbImplenented = true;
-    }
-
-    public void init(ServiceContext servicecontext) throws AxisFault {
-    }
-
-    public static void setStorage(MsgBoxStorage storageIn) {
-        storage = storageIn;
-    }
-
-    public void destroy(ServiceContext serviceContext) {
-        if (logger.isDebugEnabled())
-            logger.debug("Stopping Service....");
-        if (!dbImplenented)
-            setMap2();
+    private static final Logger logger = LoggerFactory.getLogger(MsgBoxServiceSkeleton.class);
+    private static final String TRUE = Boolean.toString(true);
+    private static final String FALSE = Boolean.toString(false);
+    private static final long SLEEP_TIME = 60 * 60 * 1000l; // 1 hour;
+    private static OMFactory factory = OMAbstractFactory.getOMFactory();
+    private MsgBoxStorage storage;
+    private Thread deletingThread;
+    private boolean stop;
+
+    public void init(ServiceContext context) throws AxisFault {
+        this.storage = (MsgBoxStorage) context.getConfigurationContext().getProperty(
+                MsgBoxCommonConstants.MSGBOX_STORAGE);
+        
+        deletingThread = new Thread(new DeleteOldMessageRunnable());
+        deletingThread.start();
+    }
+
+    public void destroy(ServiceContext context) {
+        this.storage = null;
+        
+        // stop Deleting thread
+        this.stop = true;
+        this.deletingThread.interrupt();
+
+        try{
+            deletingThread.join();
+        }catch(Exception e){
+            logger.error("Cannot shutdown cleanup thread", e);
+        }
     }
 
     public OMElement createMsgBox() throws Exception {
-        OMElement dd = factory.createOMElement(ProcessingContext.CREATE_MSGBOX_RESP_QNAME);
-        String createdMsgBoxId = "";
-        OMNamespace omNs = factory.createOMNamespace("http://org.apache.airavata/xgws/msgbox/2004/", "ns1");
-        OMElement url = factory.createOMElement("msgboxid", omNs);
         try {
-            createdMsgBoxId = storage.createMsgBox();
+            String createdMsgBoxId = storage.createMsgBox();
+
+            /*
+             * Output response
+             */
+            OMElement dd = factory.createOMElement(MsgBoxCommonConstants.CREATE_MSGBOX_RESP_QNAME);
+            OMElement url = factory.createOMElement(MsgBoxCommonConstants.MSG_BOXID_QNAME);
+            url.setText(createdMsgBoxId);
+            dd.addChild(url);
+            return dd;
         } catch (Exception e) {
-            logger.fatal((new StringBuilder()).append("Error creating the message box ").append(createdMsgBoxId)
-                    .toString(), e);
-            AxisFault f = new AxisFault((new StringBuilder()).append("Error creating the message box ")
-                    .append(createdMsgBoxId).toString(), e);
-            f.setFaultCode("6000");
+            logger.error("Error creating the message box", e);
+            AxisFault f = new AxisFault("Error creating the message box", "6000", e);
             throw f;
         }
-        url.setText(createdMsgBoxId);
-        dd.addChild(url);
-        return dd;
-    }
-
-    String getRandom(int length) {
-        UUID uuid = UUID.randomUUID();
-        String myRandom = uuid.toString();
-        return myRandom.substring(1, length);
-    }
-
-    public OMElement storeMessages(ProcessingContext procCtxt) throws Exception {
-        String clientid = "";
-        OMElement message = procCtxt.getMessage();
-        OMElement status = factory.createOMElement(new QName((new StringBuilder())
-                .append(MsgBoxNameSpConsts.MSG_BOX.getNamespaceURI()).append("/").toString(), "status", "msg"));
-        if (procCtxt.getMsgBoxAddr() != null)
-            clientid = procCtxt.getMsgBoxAddr();
+
+    }
+
+    public OMElement storeMessages(String msgBoxAddr, String messageID, String soapAction, OMElement message)
+            throws Exception {
+        OMElement resp = factory.createOMElement(MsgBoxCommonConstants.STOREMSG_RESP_QNAME);
+        OMElement status = factory.createOMElement(MsgBoxCommonConstants.MSGBOX_STATUS_QNAME);
         try {
-            storage.putMessageIntoMsgBox(clientid, procCtxt.getMessageId(), procCtxt.soapAction, message);
-            status.setText("true");
+            storage.putMessageIntoMsgBox(msgBoxAddr, messageID, soapAction, message);
+            status.setText(TRUE);
         } catch (SQLException e) {
-            logger.fatal((new StringBuilder()).append("Exception thrown while storing message: ").append(message)
-                    .append("in msgbx: ").append(clientid).toString(), e);
-            status.setText("false");
+            logger.error("Error while storing message: " + message + " in msgbx: " + msgBoxAddr, e);
+            status.setText(FALSE);
+            
+            //FIXME: Should we throw exception?? or client will read false status
         }
-        OMElement resp = factory.createOMElement(ProcessingContext.STOREMSG_RESP_QNAME);
         resp.addChild(status);
         resp.declareNamespace(MsgBoxNameSpConsts.MSG_BOX);
         return resp;
     }
 
-    public OMElement takeMessages(ProcessingContext procCtxt) throws Exception {
-        String key = "";
-        if (procCtxt.getMsgBoxAddr() != null)
-            key = procCtxt.getMsgBoxAddr();
-        OMElement respEl = factory.createOMElement(new QName((new StringBuilder())
-                .append(MsgBoxNameSpConsts.MSG_BOX.getNamespaceURI()).append("/").toString(), "takeMessagesResponse",
-                "msg"));
-        OMElement messageSet = factory.createOMElement(new QName((new StringBuilder())
-                .append(MsgBoxNameSpConsts.MSG_BOX.getNamespaceURI()).append("/").toString(), "messages", "msg"));
+    public OMElement takeMessages(String msgBoxAddr) throws Exception {        
         try {
-            LinkedList<String> list = (LinkedList<String>) storage.takeMessagesFromMsgBox(key);
-            int i = 0;
-            if (list != null)
-                while (list.size() > 0) {
-                    messageSet.addChild(MsgBoxUtils.reader2OMElement(new StringReader(list.removeFirst())));
-                    i++;
+            OMElement respEl = factory.createOMElement(MsgBoxCommonConstants.TAKE_MSGBOX_RESP_QNAME);
+            OMElement messageSet = factory.createOMElement(MsgBoxCommonConstants.MSGBOX_MESSAGE_QNAME);
+            
+            List<String> list = storage.takeMessagesFromMsgBox(msgBoxAddr);
+            if (list != null && list.size() != 0) {
+                for (String string : list) {
+                    messageSet.addChild(MsgBoxUtils.reader2OMElement(new StringReader(string)));
                 }
-            else if (logger.isDebugEnabled())
-                logger.info("   no messages..");
+            } else {
+                logger.debug("  no messages..  ");
+            }
+            respEl.addChild(messageSet);
+            respEl.declareNamespace(MsgBoxNameSpConsts.MSG_BOX);
+            return respEl;
         } catch (Exception e) {
-            logger.fatal((new StringBuilder()).append("error taking mesages.. of message box.. - ").append(key)
-                    .toString(), e);
+            logger.error("Error taking mesages of message box: " + msgBoxAddr, e);
+            throw e;
         }
-        respEl.addChild(messageSet);
-        respEl.declareNamespace(MsgBoxNameSpConsts.MSG_BOX);
-        return respEl;
     }
 
-    public OMElement destroyMsgBox(ProcessingContext procCtxt) throws Exception {
-        String addr = "";
-        OMElement statusEl = factory.createOMElement(new QName(ProcessingContext.DESTROY_MSGBOX_RESP_QNAME
-                .getNamespaceURI(), "status"));
-        if (procCtxt.getMsgBoxAddr() != null) {
-            addr = procCtxt.getMsgBoxAddr();
+    public OMElement destroyMsgBox(String msgBoxAddr) throws Exception {
+        OMElement respEl = factory.createOMElement(MsgBoxCommonConstants.DESTROY_MSGBOX_RESP_QNAME);
+        OMElement statusEl = factory.createOMElement(MsgBoxCommonConstants.MSGBOX_STATUS_QNAME);
+        String addr = msgBoxAddr;
+        try{
             storage.destroyMsgBox(addr);
-            statusEl.setText("true");
-        } else {
-            statusEl.setText("false");
+            statusEl.setText(TRUE);
+        }catch(Exception e){
+            logger.error("Error while delete msgbx: " + msgBoxAddr, e);
+            statusEl.setText(FALSE);
+            
+            //FIXME: Should we throw exception?? or client will read false status            
         }
-        OMElement respEl = factory.createOMElement(ProcessingContext.DESTROY_MSGBOX_RESP_QNAME);
         respEl.addChild(statusEl);
         return respEl;
     }
+    
+    class DeleteOldMessageRunnable implements Runnable{
 
-    public void removeAncientMessages() {
-        throw new UnsupportedOperationException("Do not support in memory tmeout of messages");
-    }
-
-    public void setMap2() {
-        logger.debug("storing the map.. method..\n");
-        if (!dbImplenented) {
-            InMemoryImpl mem = (InMemoryImpl) storage;
-            map = mem.getMap();
+        public void run() {
+            while(!stop){
+                try{
+                    
+                    //sleep
+                    Thread.sleep(SLEEP_TIME);
+                    
+                    //try to remove old message
+                    if(storage != null){
+                        storage.removeAncientMessages();
+                    }
+                }catch(Exception e){
+                    logger.error(e.getMessage(), e);
+                }
+            }            
         }
+        
     }
-
-    private static OMFactory factory = OMAbstractFactory.getOMFactory();
-    private static MsgBoxStorage storage;
-    boolean dbImplenented;
-    static Logger logger = Logger.getLogger(MsgBoxServiceSkeleton.class);
-    ConcurrentHashMap map;
-
+    
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java Wed Sep 21 06:20:38 2011
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.axiom.om.OMElement;
 
 /**
- * Message Box storage backend. This has implemented in two ways inmemory and
+ * Message Box storage backend. This has implemented in two ways in-memory and
  * database.
  */
 public interface MsgBoxStorage {
@@ -34,6 +34,15 @@ public interface MsgBoxStorage {
 
     public void destroyMsgBox(String key) throws Exception;
 
+    /**
+     * IMPORTANT::: List retrieved from this method is sorted by time in
+     * ascending order i.e the newest message will appear as the last item in
+     * the list.
+     * 
+     * @param key
+     * @return
+     * @throws Exception
+     */
     public List<String> takeMessagesFromMsgBox(String key) throws Exception;
 
     public void putMessageIntoMsgBox(String msgBoxID, String messageID, String soapAction, OMElement message)
@@ -43,4 +52,10 @@ public interface MsgBoxStorage {
      * The ancientness is defined in the db.config file.
      */
     public void removeAncientMessages() throws Exception;
+
+    /**
+     * Clean up method
+     */
+    public void dispose();
+
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java Wed Sep 21 06:20:38 2011
@@ -22,24 +22,54 @@
 package org.apache.airavata.wsmg.msgbox.Storage.dbpool;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.UUID;
 
 import javax.xml.stream.XMLStreamException;
 
+import org.apache.airavata.wsmg.commons.storage.DatabaseCreator;
 import org.apache.airavata.wsmg.commons.storage.JdbcStorage;
 import org.apache.airavata.wsmg.msgbox.Storage.MsgBoxStorage;
 import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Database message Storage Implementation, if msgBox.properties configured to use database this will set as the storage
- * for MsgBoxSerivceSkeleton
+ * Database message Storage Implementation, if msgBox.properties configured to
+ * use database this will set as the storage for MsgBoxSerivceSkeleton
  */
-public class DatabaseStorageImpl implements MsgBoxStorage {  
+public class DatabaseStorageImpl implements MsgBoxStorage {
 
-    public DatabaseStorageImpl(JdbcStorage db, long timeOfOldMessage) throws SQLException {
-        MessageBoxDB.initialize(db, timeOfOldMessage);
+    private static final Logger logger = LoggerFactory.getLogger(DatabaseStorageImpl.class);
+    
+    private static final String TABLE_NAME_TO_CHECK = "msgbox";
+    
+    private JdbcStorage db;   
+
+    public DatabaseStorageImpl(String jdbcUrl, String jdbcDriver, long timeOfOldMessage) {
+        try {
+            db = new JdbcStorage(10, 50, jdbcUrl, jdbcDriver, true);
+            
+            /*
+             * Check database
+             */
+            Connection conn = db.connect();
+            if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
+                DatabaseCreator.createMsgBoxDatabase(conn);
+                logger.info("New Database created for Message Box");
+            } else {
+                logger.info("Database already created for Message Box!");
+            }
+            db.closeConnection(conn);
+                       
+            MessageBoxDB.initialize(db, timeOfOldMessage);
+            
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException("Database failure");
+        }
     }
 
     public String createMsgBox() throws SQLException, IOException {
@@ -80,4 +110,10 @@ public class DatabaseStorageImpl impleme
         MessageBoxDB.getInstance().removeAncientMessages();
     }
 
+    public void dispose() {
+        if(db != null){
+            db.closeAllConnections();   
+        }
+    }
+
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java Wed Sep 21 06:20:38 2011
@@ -32,9 +32,8 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -42,14 +41,13 @@ import javax.xml.stream.XMLStreamExcepti
 
 import org.apache.airavata.wsmg.commons.storage.JdbcStorage;
 import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * This is the core class which used by DatabaseStorageImpl to perform all the
  * service operations, DatabaseStorageImpl class simply use this class in its
- * operation methods to perform the actual funcationality.
+ * operation methods to perform the actual functionality.
  */
 public class MessageBoxDB {
 
@@ -72,7 +70,7 @@ public class MessageBoxDB {
             + " (content, msgboxid, messageid,soapaction) VALUES (?,?,?,?)";
 
     public static final String SQL_SELECT_MSGBOX_STATEMENT = "SELECT * FROM " + MSGBOX_TABLENAME
-            + " WHERE msgboxid = ? ORDER BY time ";
+            + " WHERE msgboxid = ? ORDER BY time";
 
     public static final String SQL_DELETE_MSGBOX_STATEMENT = "DELETE FROM " + MSGBOX_TABLENAME + " WHERE msgboxid = ?";
 
@@ -89,7 +87,7 @@ public class MessageBoxDB {
         this.time = time;
     }
 
-    public static MessageBoxDB initialize(JdbcStorage db, long time) throws SQLException {
+    public static synchronized MessageBoxDB initialize(JdbcStorage db, long time) throws SQLException {
         if (instance == null) {
             instance = new MessageBoxDB(db, time);
             setMsgBoxidList(db);
@@ -104,7 +102,7 @@ public class MessageBoxDB {
         return instance;
     }
 
-    public void createMsgBx(String messageBoxId) throws SQLException, IOException {
+    public synchronized void createMsgBx(String messageBoxId) throws SQLException, IOException {
         if (!msgBoxids.contains(messageBoxId)) {
 
             Connection connection = null;
@@ -124,11 +122,11 @@ public class MessageBoxDB {
                 throw sql;
             }
         } else {
-            throw new AxisFault("The message box ID requested already exists");
+            throw new IOException("The message box ID requested already exists");
         }
     }
 
-    public void addMessage(String msgBoxID, String messageID, String soapAction, OMElement message)
+    public synchronized void addMessage(String msgBoxID, String messageID, String soapAction, OMElement message)
             throws SQLException, IOException, XMLStreamException {
         if (msgBoxids.contains(msgBoxID)) {
 
@@ -154,11 +152,11 @@ public class MessageBoxDB {
                 throw sql;
             }
         } else {
-            throw new AxisFault("Currently a messagebox is not available with given message box id :" + msgBoxID);
+            throw new IOException("Currently a messagebox is not available with given message box id :" + msgBoxID);
         }
     }
 
-    public void deleteMessageBox(String msgBoxId) throws SQLException {
+    public synchronized void deleteMessageBox(String msgBoxId) throws SQLException {
 
         if (msgBoxids.contains(msgBoxId)) {
 
@@ -185,9 +183,9 @@ public class MessageBoxDB {
         }
     }
 
-    public List<String> removeAllMessagesforClient(String msgBoxId) throws SQLException, IOException,
+    public synchronized List<String> removeAllMessagesforClient(String msgBoxId) throws SQLException, IOException,
             ClassNotFoundException, XMLStreamException {
-        LinkedList<String> list = new LinkedList<String>();
+        ArrayList<String> list = new ArrayList<String>();
         if (msgBoxids.contains(msgBoxId)) {
 
             Connection connection = null;
@@ -202,7 +200,7 @@ public class MessageBoxDB {
                     ObjectInputStream s = new ObjectInputStream(in);
                     String xmlString = (String) s.readObject();
                     logger.debug(xmlString);
-                    list.addFirst(xmlString);
+                    list.add(xmlString);
                 }
                 resultSet.close();
                 stmt.close();
@@ -241,7 +239,7 @@ public class MessageBoxDB {
         return list;
     }
 
-    public void removeAncientMessages() {
+    public synchronized void removeAncientMessages() {
         Connection connection = null;
         try {
             connection = db.connect();
@@ -258,7 +256,7 @@ public class MessageBoxDB {
     }
 
     private static void setMsgBoxidList(JdbcStorage db) throws SQLException {
-        msgBoxids = Collections.synchronizedSet(new HashSet<String>());
+        msgBoxids = new HashSet<String>();
 
         Connection connection = null;
         PreparedStatement stmt = null;
@@ -269,6 +267,7 @@ public class MessageBoxDB {
             while (resultSet.next()) {
                 msgBoxids.add(resultSet.getString("msgboxid"));
             }
+            db.commit(connection);
         } finally {
             try {
                 if (stmt != null) {

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java Wed Sep 21 06:20:38 2011
@@ -21,103 +21,111 @@
 
 package org.apache.airavata.wsmg.msgbox.Storage.memory;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.airavata.wsmg.msgbox.Storage.MsgBoxStorage;
 import org.apache.axiom.om.OMElement;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is the inmemoery storage implementation for MsgBoxService, this will be initialized if msgBox.properties is
- * configured not to use database implementation.
+ * This is the in memory storage implementation for MsgBoxService, this will be
+ * initialized if msgBox.properties is configured not to use database
+ * implementation.
  */
 public class InMemoryImpl implements MsgBoxStorage {
-    static Logger logger = Logger.getLogger(InMemoryImpl.class);
+    private static final Logger logger = LoggerFactory.getLogger(InMemoryImpl.class);
 
-    ConcurrentHashMap<String, LinkedList<String>> map;
+    private HashMap<String, List<Content>> map = new HashMap<String, List<Content>>();
 
-    public ConcurrentHashMap<String, LinkedList<String>> getMap() {
-        return map;
-    }
+    private long time;
 
-    public void setMap(ConcurrentHashMap<String, LinkedList<String>> map) {
-        this.map = map;
+    public InMemoryImpl(long time) {
+        this.time = time;
     }
 
     public String createMsgBox() throws Exception {
-
-        String clientid = UUID.randomUUID().toString();
-        lookupQueue(clientid); // that will create an empty queue
-        return clientid;
+        synchronized (map) {
+            String clientid = UUID.randomUUID().toString();
+            if(map.containsKey(clientid))
+                throw new Exception("Message Box is existed with key:" + clientid);            
+            map.put(clientid, new ArrayList<Content>());            
+            return clientid;
+        }
     }
 
     public void destroyMsgBox(String key) throws Exception {
-        if (map.containsKey(key))
+        synchronized (map) {
             map.remove(key);
+        }
     }
 
-    public LinkedList<String> takeMessagesFromMsgBox(String key) throws Exception {
-
-        LinkedList<String> list;
-
+    public List<String> takeMessagesFromMsgBox(String key) throws Exception {
         synchronized (map) {
-            list = map.get(key);
-
-            if (list == null)
-                throw new IllegalArgumentException("no message box with key " + key);
+            List<Content> x = map.get(key);
+            ArrayList<String> result = new ArrayList<String>(x.size());
+            for (Content content : x) {
+                result.add(content.getContent());
+            }
+            map.put(key, new ArrayList<Content>());
+            return result;
         }
-        return list;
-
     }
 
     public void putMessageIntoMsgBox(String msgBoxID, String messageID, String soapAction, OMElement message)
             throws Exception {
-
-        // To change body of implemented methods use File | Settings | File Templates.
-        LinkedList<String> list = lookupQueue(msgBoxID);
-        if (list == null) {
-            throw new IllegalArgumentException("no message box with key " + msgBoxID + " to store the msg");
-        }
-        synchronized (list) {
-            list.addLast(message.toStringWithConsume());
-            logger.info("Message Stored in list with key " + msgBoxID);
+        synchronized (map) {
+            if (!map.containsKey(msgBoxID)) {
+                throw new IllegalArgumentException("no message box with key " + msgBoxID + " to store the msg");
+            }
+            List<Content> list = map.get(msgBoxID);
+            list.add(new Content(message.toStringWithConsume(), System.currentTimeMillis()));
+            logger.debug("Message Stored in list with key " + msgBoxID);
         }
     }
 
-    /**
-     * The ancientness is defined in the db.config file.
-     */
     public void removeAncientMessages() {
-        // To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void closeConnection() throws Exception {
-        // TODOn - store map back
-    }
-
-    private LinkedList<String> lookupQueue(String key) {
-
-        logger.debug("lookupQueue: calling the getMap method...");
-
-        if (key == null)
-            throw new IllegalArgumentException();
+        /*
+         * O(n^2) algorithms. Better performance can be achieved with more Cache.
+         */
         synchronized (map) {
-            LinkedList<String> v = map.get(key);
-            logger.debug(key + " is being searched in map..");
-
-            if (v != null) {
-                logger.info("key found in map.. " + key);
-                return v;
+            long currentTime = System.currentTimeMillis();
+            Iterator<List<Content>> it = map.values().iterator();
+            while(it.hasNext()){
+                Iterator<Content> itToRemove = it.next().iterator();
+                while(itToRemove.hasNext()){
+                    Content content = itToRemove.next();
+                    if(currentTime - this.time > content.getTime()){
+                        itToRemove.remove();
+                    }
+                }                
             }
+        }
+    }   
 
-            logger.info("key not found in map.. " + key);
-            LinkedList<String> list = new LinkedList<String>();
-            map.put(key, list);
-            logger.debug("new list created in map.. calling the setMap method...");
-            // this.setMap(map);
-            return list;
+    public void dispose() {
+        synchronized (map) {
+            map.clear();
         }
     }
+
+    class Content {
+        private String content;
+        private long time;
+        public Content(String content, long time) {
+            this.content = content;
+            this.time = time;
+        }
+        public String getContent() {
+            return content;
+        }
+        public long getTime() {
+            return time;
+        }
+    }
+
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java Wed Sep 21 06:20:38 2011
@@ -31,22 +31,19 @@ import org.apache.axis2.description.Axis
 import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
 import org.apache.axis2.engine.Phase;
 import org.apache.axis2.util.JavaUtils;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This Dispatcher is used to validate the incoming message, this is set to Handler list in MsgBoxServiceLifeCycle.
  */
 public class StoreMessageHandler extends AddressingBasedDispatcher {
+    private static final Logger logger = LoggerFactory.getLogger(StoreMessageHandler.class);
     private static final String WSMG_MSGSTORE_SOAP_ACTION = "http://org.apache.airavata/xgws/msgbox/2004/storeMessages";
     private static final String ADDRESSING_VALIDATE_ACTION = "addressing.validateAction";
-    Logger logger = Logger.getLogger(StoreMessageHandler.class);
-    private Phase addressingPhase = null;
-
-    private AxisOperation messageBoxOperation = null;
-
-    public StoreMessageHandler() {
-
-    }
+    
+    private Phase addressingPhase;
+    private AxisOperation messageBoxOperation;
 
     public org.apache.axis2.engine.Handler.InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java Wed Sep 21 06:20:38 2011
@@ -26,7 +26,7 @@ public class ConfigKeys {
     public static final String MSG_PRESV_DAYS = "messagePreservationDays";
     public static final String MSG_PRESV_HRS = "messagePreservationHours";
     public static final String MSG_PRESV_MINS = "messagePreservationMinutes";
-    public static String JDBC_DRIVER = "msgBox.jdbc.driver";
+    public static String MSG_BOX_JDBC_DRIVER = "msgBox.jdbc.driver";
     public static String MSG_BOX_JDBC_URL = "msgBox.jdbc.url";
-    public static String USE_DATABSE_STORAGE = "msgBox.usedatabase";
+    public static String USE_DATABASE_STORAGE = "msgBox.usedatabase";
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java Wed Sep 21 06:20:38 2011
@@ -24,13 +24,31 @@ package org.apache.airavata.wsmg.msgbox.
 import javax.xml.namespace.QName;
 
 public class MsgBoxCommonConstants {
-
-    public static final String DB_IMPLEMENTED_TRUE = "database.inited";
     public static final String MSGBOX_STORAGE = "msgbox.storage";
-    public static final String CONF_MANAGER = "conf.manager";
-    public static final String INIT_MSG_BOX_SKELETON_TRUE = "init.msgbox.skeleton";
     public static final String AXIS_MODULE_NAME_ADDRESSING = "addressing";
-    public static final QName MSG_BOXID_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
-            "MsgBoxId");
+    public static final QName MSG_BOXID_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/", "msgboxid",
+            "msg");
+
+    public static final QName STOREMSG_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
+            "storeMessages", "msg");
+    public static final QName DESTROYMSG_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
+            "destroyMsgBox", "msg");
+    public static final QName TAKEMSGS_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
+            "takeMessages", "msg");
+    public static final QName CREATEMSG_BOX = new QName("http://org.apache.airavata/xgws/msgbox/2004/", "createMsgBox",
+            "msg");       
+    
+    public static final QName STOREMSG_RESP_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
+            "storeMessagesResponse", "msg");    
+    public static final QName DESTROY_MSGBOX_RESP_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
+            "destroyMsgBoxResponse", "msg");
+    public static final QName CREATE_MSGBOX_RESP_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
+            "createMsgBoxResponse", "msg");
+    public static final QName TAKE_MSGBOX_RESP_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/",
+            "takeMessagesResponse", "msg");
+    
+    
+    public static final QName MSGBOX_STATUS_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/", "status", "msg");
+    public static final QName MSGBOX_MESSAGE_QNAME = new QName("http://org.apache.airavata/xgws/msgbox/2004/", "messages", "msg");
 
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java Wed Sep 21 06:20:38 2011
@@ -49,6 +49,7 @@ public class MsgBoxUtils {
 
         StAXOMBuilder builder = new StAXOMBuilder(inflow);
         OMElement omElement = builder.getDocumentElement();
+        inflow.close();
         return omElement;
     }
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/resources/services.xml
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/resources/services.xml?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/resources/services.xml (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/resources/services.xml Wed Sep 21 06:20:38 2011
@@ -1,48 +1,56 @@
 <?xml version="1.0" encoding="UTF-8"?>
-	<!-- This file was auto-generated from WSDL -->
-	<!--
-		by the Apache Axis2 version: 1.4 Built on : Apr 26, 2008 (06:24:30
-		EDT)
-	-->
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
 <serviceGroup>
-	<service name="MsgBoxService" scope="application" class="org.apache.airavata.wsmg.msgbox.MsgBoxServiceLifeCycle">
-		<messageReceivers>
-			<messageReceiver mep="http://www.w3.org/ns/wsdl/in-out"
-				class="org.apache.airavata.wsmg.msgbox.MsgBoxServiceMessageReceiverInOut" />
-		</messageReceivers>
-		<parameter name="ServiceClass">org.apache.airavata.wsmg.msgbox.MsgBoxServiceSkeleton
-		</parameter>
-		<parameter name="useOriginalwsdl">false</parameter>
-		<parameter name="modifyUserWSDLPortAddress">true</parameter>
-		<operation name="storeMessages" mep="http://www.w3.org/ns/wsdl/in-out"
-			namespace="http://org.apache.airavata/xgws/msgbox/2004/">
-			<actionMapping>http://org.apache.airavata/xgws/msgbox/2004/storeMessages
-			</actionMapping>
-			<outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/storeMessagesResponse
-			</outputActionMapping>
-		</operation>
-		<operation name="destroyMsgBox" mep="http://www.w3.org/ns/wsdl/in-out"
-			namespace="http://org.apache.airavata/xgws/msgbox/2004/">
-			<actionMapping>http://org.apache.airavata/xgws/msgbox/2004/destroyMsgBox
-			</actionMapping>
-			<outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/destroyMsgBoxResponse
-			</outputActionMapping>
-		</operation>
-		<operation name="takeMessages" mep="http://www.w3.org/ns/wsdl/in-out"
-			namespace="http://org.apache.airavata/xgws/msgbox/2004/">
-			<actionMapping>http://org.apache.airavata/xgws/msgbox/2004/takeMessages
-			</actionMapping>
-			<outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/takeMessagesResponse
-			</outputActionMapping>
-		</operation>
-		<operation name="createMsgBox" mep="http://www.w3.org/ns/wsdl/in-out"
-			namespace="http://org.apache.airavata/xgws/msgbox/2004/">
-			<actionMapping>http://org.apache.airavata/xgws/msgbox/2004/createMsgBox
-			</actionMapping>
-			<outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/createMsgBoxResponse
-			</outputActionMapping>
-		</operation>
-		<parameter name="configuration.file.name" locked="false">msgBox.properties</parameter>
-	</service>
+    <service name="MsgBoxService" scope="application" class="org.apache.airavata.wsmg.msgbox.MsgBoxServiceLifeCycle">
+        <messageReceivers>
+            <messageReceiver mep="http://www.w3.org/ns/wsdl/in-out" class="org.apache.airavata.wsmg.msgbox.MsgBoxServiceMessageReceiverInOut" />
+        </messageReceivers>        
+        <parameter name="useOriginalwsdl">false</parameter>
+        <parameter name="modifyUserWSDLPortAddress">true</parameter>
+        <operation name="storeMessages" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/xgws/msgbox/2004/">
+            <actionMapping>http://org.apache.airavata/xgws/msgbox/2004/storeMessages
+            </actionMapping>
+            <outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/storeMessagesResponse
+            </outputActionMapping>
+        </operation>
+        <operation name="destroyMsgBox" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/xgws/msgbox/2004/">
+            <actionMapping>http://org.apache.airavata/xgws/msgbox/2004/destroyMsgBox
+            </actionMapping>
+            <outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/destroyMsgBoxResponse
+            </outputActionMapping>
+        </operation>
+        <operation name="takeMessages" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/xgws/msgbox/2004/">
+            <actionMapping>http://org.apache.airavata/xgws/msgbox/2004/takeMessages
+            </actionMapping>
+            <outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/takeMessagesResponse
+            </outputActionMapping>
+        </operation>
+        <operation name="createMsgBox" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/xgws/msgbox/2004/">
+            <actionMapping>http://org.apache.airavata/xgws/msgbox/2004/createMsgBox
+            </actionMapping>
+            <outputActionMapping>http://org.apache.airavata/xgws/msgbox/2004/MsgBoxPT/createMsgBoxResponse
+            </outputActionMapping>
+        </operation>
+        <parameter name="configuration.file.name" locked="false">msgBox.properties</parameter>
+    </service>
 
 </serviceGroup>

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java Wed Sep 21 06:20:38 2011
@@ -22,8 +22,6 @@
 package org.apache.airavata.wsmg.broker;
 
 import java.io.File;
-import java.util.Iterator;
-import java.util.List;
 
 import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
@@ -32,6 +30,7 @@ import org.apache.airavata.wsmg.commons.
 import org.apache.airavata.wsmg.commons.storage.WsmgInMemoryStorage;
 import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
 import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
+import org.apache.airavata.wsmg.commons.util.Axis2Utils;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
 import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
@@ -40,15 +39,13 @@ import org.apache.airavata.wsmg.messenge
 import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
 import org.apache.airavata.wsmg.messenger.strategy.impl.SerialSender;
 import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.AxisService;
-import org.apache.axis2.engine.Handler;
-import org.apache.axis2.engine.Phase;
+import org.apache.axis2.engine.ServiceLifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BrokerServiceLifeCycle implements org.apache.axis2.engine.ServiceLifeCycle {
+public class BrokerServiceLifeCycle implements ServiceLifeCycle {
 
     private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
     private SendingStrategy method = null;
@@ -66,7 +63,7 @@ public class BrokerServiceLifeCycle impl
 
         if (inited == null || inited == false) {
             log.info("starting broker");
-            overrideAddressingPhaseHander(configContext);
+            Axis2Utils.overrideAddressingPhaseHander(configContext, new PublishedMessageHandler());
             initConfigurations(configContext, axisService);
 
             WsmgConfigurationContext brokerConext = (WsmgConfigurationContext) configContext
@@ -87,25 +84,25 @@ public class BrokerServiceLifeCycle impl
         WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
         configContext.setProperty(WsmgCommonConstants.BROKER_WSMGCONFIG, wsmgConfig);
 
-        ConfigurationManager configMan = new ConfigurationManager("conf" + File.separator + WsmgCommonConstants.BROKER_CONFIGURATION_FILE_NAME);
+        ConfigurationManager configMan = new ConfigurationManager("conf" + File.separator
+                + WsmgCommonConstants.BROKER_CONFIGURATION_FILE_NAME);
 
         wsmgConfig.setConfigurationManager(configMan);
 
         String type = configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE,
                 WsmgCommonConstants.STORAGE_TYPE_PERSISTANT);
 
+        /*
+         * Determine Storage
+         */
         WsmgStorage storage = null;
 
         if (WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY.equalsIgnoreCase(type)) {
             storage = new WsmgInMemoryStorage();
         } else {
-            try {
-                storage = new WsmgPersistantStorage(WsmgCommonConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS,
-                        WsmgCommonConstants.TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS, configMan);
-
-            } catch (AxisFault e) {
-                throw new RuntimeException("Unable to init Broker persistant storage", e);
-            }
+            String jdbcUrl = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL);
+            String jdbcDriver = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER);
+            storage = new WsmgPersistantStorage(jdbcUrl, jdbcDriver);
         }
 
         wsmgConfig.setStorage(storage);
@@ -179,40 +176,4 @@ public class BrokerServiceLifeCycle impl
         method.start();
         log.info(initedmethod + " sending method inited");
     }
-
-    /**
-     * @param configContext
-     */
-    private void overrideAddressingPhaseHander(ConfigurationContext configContext) {
-
-        List<Phase> inflowPhases = configContext.getAxisConfiguration().getPhasesInfo().getINPhases();
-        boolean foundFlag = false;
-
-        for (Phase p : inflowPhases) {
-
-            if (p.getName().equalsIgnoreCase("Addressing")) {
-
-                List<Handler> handlers = p.getHandlers();
-
-                for (Iterator<Handler> ite = handlers.iterator(); ite.hasNext();) {
-                    Handler h = ite.next();
-                    if (h.getClass().isAssignableFrom(PublishedMessageHandler.class)) {
-                        p.removeHandler(h.getHandlerDesc());
-                        break;
-                    }
-                }
-
-                p.addHandler(new PublishedMessageHandler(), 0);
-                foundFlag = true;
-                break;
-            }
-
-        }
-
-        if (!foundFlag) {
-            throw new RuntimeException("unable to find addressing phase - inside inflow phases");
-        }
-
-    }
-
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java Wed Sep 21 06:20:38 2011
@@ -34,14 +34,14 @@ class CleanUpThread implements Runnable 
 
     private static final Logger logger = LoggerFactory.getLogger(CleanUpThread.class);
 
-    SubscriptionManager subMan = null;
+    private SubscriptionManager subMan;
 
     public CleanUpThread(SubscriptionManager manager) {
-        subMan = manager;
+        this.subMan = manager;
     }
 
     public void run() {
-        // logger.info("CleanUpThread started");
+        logger.debug("CleanUpThread started");
         String key = null;
         SubscriptionState subscription = null;
         Set<String> keySet = null;

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionManager.java Wed Sep 21 06:20:38 2011
@@ -65,20 +65,21 @@ public class SubscriptionManager {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionManager.class);
 
-    private HashMap<String, SubscriptionState> subscriptions = new HashMap<String, SubscriptionState>(); // JDK15
-
-    private WsmgStorage subscriptionDB = null;
+    private HashMap<String, SubscriptionState> subscriptions = new HashMap<String, SubscriptionState>();
 
     private ReentrantReadWriteLock subscriptionLock = new ReentrantReadWriteLock();
 
-    private int counter = 1;
+    private WSEProtocolSupport wseProtocalSupport = new WSEProtocolSupport();
+
+    private WSNTProtocolSupport wsntProtocolSupport = new WSNTProtocolSupport();
+
+    private WsmgStorage subscriptionDB;
 
     private WsmgConfigurationContext wsmgConfig;
 
     private OutGoingQueue outGoingQueue;
-
-    private WSEProtocolSupport wseProtocalSupport = new WSEProtocolSupport();
-    private WSNTProtocolSupport wsntProtocolSupport = new WSNTProtocolSupport();
+    
+    private int counter = 1;
 
     public SubscriptionManager(WsmgConfigurationContext paramters, WsmgStorage storage) {
         init(paramters, storage);
@@ -92,10 +93,8 @@ public class SubscriptionManager {
         outGoingQueue = parameters.getOutgoingQueue();
         if (WSMGParameter.enableAutoCleanSubscriptions) {
             CleanUpThread cleanUpThread = new CleanUpThread(this);
-
             Thread t = new Thread(cleanUpThread);
             t.start();
-
         }
 
         try {
@@ -235,7 +234,8 @@ public class SubscriptionManager {
     }
 
     /**
-     * if find the subscription already exists, return the current subscriptionId else return null;
+     * if find the subscription already exists, return the current
+     * subscriptionId else return null;
      */
 
     public String checkSubscriptionExist(SubscriptionState state) {

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java Wed Sep 21 06:20:38 2011
@@ -43,44 +43,35 @@ public class WSEventingMsgReceiver exten
     public MessageContext process(MessageContext inMsg, String operationName) throws AxisFault {
 
         WsEventingOperations msgType = WsEventingOperations.valueFrom(operationName);
-
         ProcessingContext processingContext = builder.build(inMsg, msgType);
-
         MessageContext outputMsg = null;
 
+        log.debug("WS-Eventing: " + msgType);
+        
         switch (msgType) {
         case SUBSCRIBE: {
-
             WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
                     .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
 
             brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
             outputMsg = createOutputMessageContext(inMsg, processingContext);
-        }
             break;
+        }
         case UNSUBSCRIBE: {
-
             WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
                     .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
 
             brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
             outputMsg = createOutputMessageContext(inMsg, processingContext);
-        }
             break;
+        }
         case RENEW:
-            // throw AxisFault.makeFault("unsupported operation" +
-            // msgType.toString());
-            break;
         case GET_STATUS:
-            // nothing to do
-            break;
         case SUBSCRIPTION_END:
-            // nothing to do
-            break;
+        default:
+            throw new AxisFault("unsupported operation" + msgType.toString());
 
         }
-
         return outputMsg;
     }
-
-}// end of class
+}

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java?rev=1173508&r1=1173507&r2=1173508&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java Wed Sep 21 06:20:38 2011
@@ -52,7 +52,6 @@ public class WSNotificationMsgReceiver e
 
         switch (msgType) {
         case NOTIFY: {
-
             try {
 
                 WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg
@@ -64,18 +63,16 @@ public class WSNotificationMsgReceiver e
                 throw new AxisFault("unable to process message", e);
             }
             outputMsg = createOutputMessageContext(inMsg, processingContext);
-
-        }
             break;
-
+        }
         case SUBSCRIBE: {
 
             WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
                     .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
             brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
             outputMsg = createOutputMessageContext(inMsg, processingContext);
-        }
             break;
+        }
         case UNSUBSCRIBE: {
 
             WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
@@ -83,11 +80,8 @@ public class WSNotificationMsgReceiver e
 
             brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
             outputMsg = createOutputMessageContext(inMsg, processingContext);
-
-        }
             break;
-        case GET_CURRENT_MSG:
-            throw new AxisFault("not implemented yet");
+        }
         case RESUME_SUBSCRIPTION: {
 
             WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
@@ -95,18 +89,20 @@ public class WSNotificationMsgReceiver e
 
             brokerConfigContext.getSubscriptionManager().resumeSubscription(processingContext);
             outputMsg = createOutputMessageContext(inMsg, processingContext);
-
-        }
             break;
+        }
         case PAUSE_SUBSCRIPTION: {
             WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
                     .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
 
             brokerConfigContext.getSubscriptionManager().pauseSubscription(processingContext);
             outputMsg = createOutputMessageContext(inMsg, processingContext);
-        }
             break;
         }
+        case GET_CURRENT_MSG:
+        default:
+            throw new AxisFault("not implemented yet");
+        }
 
         return outputMsg;
     }



Mime
View raw message