airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [06/13] airavata git commit: retiring ws-messenger and remove dependency of workflow tracking - AIRAVATA-1556, AIRAVATA-1557
Date Tue, 27 Jan 2015 15:27:00 GMT
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java
deleted file mode 100644
index b3a3f4c..0000000
--- a/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.msgbox;
-
-import java.io.StringReader;
-import java.util.Iterator;
-import java.util.Random;
-
-import junit.framework.TestCase;
-
-import org.apache.airavata.wsmg.msgbox.client.MsgBoxClient;
-import org.apache.airavata.wsmg.msgbox.util.MsgBoxUtils;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.addressing.EndpointReference;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class MsgBoxTest extends TestCase {
-
-    private int port = InMemoryMessageBoxServer.TESTING_PORT;
-    private long timeout = 5000L;
-
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
-
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        InMemoryMessageBoxServer.start(null, null);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-    }
-
-    @Test
-    public void testMessageBox() throws Exception {
-
-        MsgBoxClient user = new MsgBoxClient();
-        StringBuilder builder = new StringBuilder();
-        port = InMemoryMessageBoxServer.TESTING_PORT;
-        EndpointReference msgBoxEpr = user.createMessageBox("http://localhost:" + port
-                + "/axis2/services/MsgBoxService", timeout);
-
-        for (int i = 0; i < 10; i++) {
-
-            builder.delete(0, builder.capacity());
-            Random x = new Random();
-            for (int j = 0; j < x.nextInt(50); j++) {
-                builder.append("123456789");
-            }
-
-            String msg = String.format("<msg><seq>%d</seq><fill>%s</fill></msg>", i, builder.toString());
-
-            user.storeMessage(msgBoxEpr, timeout, MsgBoxUtils.reader2OMElement(new StringReader(msg)));
-
-            Thread.sleep(200L);
-        }
-
-        Iterator<OMElement> iterator = null;
-
-        try {
-            iterator = user.takeMessagesFromMsgBox(msgBoxEpr, timeout);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        if (iterator != null)
-            while (iterator.hasNext()) {
-                System.out.println(iterator.next().toStringWithConsume());
-            }
-
-        System.out.println("Delete message box response :  " + user.deleteMsgBox(msgBoxEpr, 5000L));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/pom.xml
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/pom.xml b/modules/ws-messenger/messagebroker/pom.xml
deleted file mode 100644
index b62bded..0000000
--- a/modules/ws-messenger/messagebroker/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file 
-    distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under 
-    the Apache License, Version 2.0 (theƏ "License"); you may not use this file except in compliance with the License. You may 
-    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to 
-    in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
-    ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
-    the License. -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <parent>
-        <groupId>org.apache.airavata</groupId>
-        <artifactId>airavata-ws-messenger</artifactId>
-        <version>0.12-SNAPSHOT</version>
-    </parent>
-
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>airavata-message-broker</artifactId>
-    <name>Airavata Message Broker</name>
-    <url>http://airavata.apache.org/</url>
-    <packaging>jar</packaging>
-
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <version>${antrun.version}</version>
-                <executions>
-                    <execution>
-                        <id>restore-persistence</id>
-                        <phase>prepare-package</phase>
-                        <configuration>
-                            <tasks>
-                                <copy file="${project.build.outputDirectory}/services.xml" tofile="${project.build.outputDirectory}/META-INF/services.xml" />
-                            </tasks>
-                        </configuration>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-messenger-commons</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-common-utils</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-messenger-client</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.ogce</groupId>
-            <artifactId>yfilter</artifactId>
-            <version>${yfilter.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>wsdl4j</groupId>
-            <artifactId>wsdl4j</artifactId>
-            <version>1.5.2</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.axis2</groupId>
-            <artifactId>axis2</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.axis2</groupId>
-            <artifactId>axis2-transport-http</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.axis2</groupId>
-            <artifactId>axis2-transport-local</artifactId>
-        </dependency>
-
-        <!-- Logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-
-        <!-- Test -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-	    <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-server-configuration</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    </properties>
-</project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java
deleted file mode 100644
index 9277dcc..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.receivers.AbstractMessageReceiver;
-import org.apache.axis2.util.MessageContextBuilder;
-
-public abstract class AbstractBrokerMsgReceiver extends AbstractMessageReceiver {
-
-    protected abstract MessageContext process(MessageContext inMsgContext, String operationName) throws AxisFault;
-
-    @Override
-    protected void invokeBusinessLogic(MessageContext inMsgContext) throws AxisFault {
-
-        String operationName = getOperationName(inMsgContext);
-        MessageContext outMsgContext = process(inMsgContext, operationName);
-
-        if (outMsgContext != null) {
-            outMsgContext.setTo(null);
-            super.replicateState(inMsgContext);
-            AxisEngine.send(outMsgContext);
-
-        }
-
-    }
-
-    protected String getOperationName(MessageContext inMsg) throws AxisFault {
-
-        org.apache.axis2.description.AxisOperation op = inMsg.getOperationContext().getAxisOperation();
-        if (op == null) {
-            throw new AxisFault(
-                    "Operation is not located, if this is doclit style the SOAP-ACTION should specified via the SOAP Action to use the RawXMLProvider");
-        }
-
-        java.lang.String operationName = null;
-        if ((op.getName() == null)
-                || ((operationName = org.apache.axis2.util.JavaUtils.xmlNameToJava(op.getName().getLocalPart())) == null)) {
-            throw new AxisFault("invalid operation found");
-        }
-
-        return operationName;
-    }
-
-    protected MessageContext createOutputMessageContext(MessageContext inMsg, ProcessingContext processingContext)
-            throws AxisFault {
-
-        MessageContext outMsgContext = MessageContextBuilder.createOutMessageContext(inMsg);
-        outMsgContext.getOperationContext().addMessageContext(outMsgContext);
-
-        SOAPEnvelope outputEnvelope = getSOAPFactory(inMsg).getDefaultEnvelope();
-
-        if (processingContext.getRespMessage() != null) {
-
-            outputEnvelope.getBody().addChild(processingContext.getRespMessage());
-
-            if (processingContext.getResponseMsgNamespaces() != null) {
-                declareResponseMsgNamespace(outputEnvelope, processingContext.getResponseMsgNamespaces());
-            }
-        }
-
-        outMsgContext.setEnvelope(outputEnvelope);
-        return outMsgContext;
-    }
-
-    private void declareResponseMsgNamespace(SOAPEnvelope outputEnvelope, List<OMNamespace> namespaces) {
-
-        if (!namespaces.contains(NameSpaceConstants.WSA_NS)) {
-            namespaces.add(NameSpaceConstants.WSA_NS);// declare WSA by default
-        }
-
-        for (OMNamespace ns : namespaces) {
-            outputEnvelope.declareNamespace(ns);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java
deleted file mode 100644
index acf8f0f..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.io.Serializable;
-
-public class AdditionalMessageContent implements Serializable {
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = -5163025283681463108L;
-
-    String action;
-
-    String messageID;
-
-    String topicElement;
-
-    String producerReference;
-    String trackId;
-
-    /**
-     * @param action
-     * @param messageID
-     */
-    public AdditionalMessageContent(String action, String messageID) {
-        super();
-        // TODO Auto-generated constructor stub
-        this.action = action;
-        this.messageID = messageID;
-    }
-
-    /**
-     * @return Returns the action.
-     */
-    public String getAction() {
-        return action;
-    }
-
-    /**
-     * @param action
-     *            The action to set.
-     */
-    public void setAction(String action) {
-        this.action = action;
-    }
-
-    /**
-     * @return Returns the messageID.
-     */
-    public String getMessageID() {
-        return messageID;
-    }
-
-    /**
-     * @param messageID
-     *            The messageID to set.
-     */
-    public void setMessageID(String messageID) {
-        this.messageID = messageID;
-    }
-
-    /**
-     * @return Returns the producerReference.
-     */
-    public String getProducerReference() {
-        return producerReference;
-    }
-
-    /**
-     * @param producerReference
-     *            The producerReference to set.
-     */
-    public void setProducerReference(String producerReference) {
-        this.producerReference = producerReference;
-    }
-
-    /**
-     * @return Returns the topicElement.
-     */
-    public String getTopicElement() {
-        return topicElement;
-    }
-
-    /**
-     * @param topicElement
-     *            The topicElement to set.
-     */
-    public void setTopicElement(String topicElement) {
-        this.topicElement = topicElement;
-    }
-
-    /**
-     * @return Returns the trackId.
-     */
-    public String getTrackId() {
-        return trackId;
-    }
-
-    /**
-     * @param trackId
-     *            The trackId to set.
-     */
-    public void setTrackId(String trackId) {
-        this.trackId = trackId;
-    }
-
-    public String toString() {
-        return String.format("msgId = %s, trackId = %s, topic = %s", messageID, trackId, topicElement);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
deleted file mode 100644
index 4737ef9..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.lang.reflect.Constructor;
-import java.net.URI;
-
-import org.apache.airavata.client.AiravataAPIFactory;
-import org.apache.airavata.client.api.AiravataAPI;
-import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
-import org.apache.airavata.client.tools.PeriodicExecutorThread;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ServiceUtils;
-import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.commons.storage.WsmgInMemoryStorage;
-import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage;
-import org.apache.airavata.wsmg.commons.util.Axis2Utils;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
-import org.apache.airavata.wsmg.messenger.Deliverable;
-import org.apache.airavata.wsmg.messenger.DeliveryProcessor;
-import org.apache.airavata.wsmg.messenger.SenderUtils;
-import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
-import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol;
-import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender;
-import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender;
-import org.apache.airavata.wsmg.messenger.strategy.impl.SerialSender;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.engine.ServiceLifeCycle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BrokerServiceLifeCycle implements ServiceLifeCycle {
-
-    private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class);
-//    public static final String REPOSITORY_PROPERTIES = "airavata-server.properties";
-    public static final int GFAC_URL_UPDATE_INTERVAL = 1000 * 60 * 60 * 3;
-
-    public static final int JCR_AVAIALABILITY_WAIT_INTERVAL = 1000 * 10;
-    public static final String JCR_CLASS = "jcr.class";
-    public static final String JCR_USER = "jcr.user";
-    public static final String JCR_PASS = "jcr.pass";
-    public static final String ORG_APACHE_JACKRABBIT_REPOSITORY_URI = "org.apache.jackrabbit.repository.uri";
-    private static final String MESSAGE_BROKER_SERVICE_NAME = "EventingService";
-    private static final String SERVICE_URL = "message_broker_service_url";
-    private static final String JCR_REGISTRY = "registry";
-    private Thread thread;
-
-    private static final long DEFAULT_SOCKET_TIME_OUT = 20000l;
-
-    private DeliveryProcessor proc;
-    private ConsumerUrlManager urlManager;
-
-    private static Boolean initialized = false;
-
-    public void shutDown(ConfigurationContext configurationcontext, AxisService service) {
-        log.info("broker shutting down");
-        if (proc != null) {
-            proc.stop();
-            proc = null;
-        }
-        if (urlManager != null) {
-            urlManager.stop();
-            urlManager = null;
-        }
-
-        synchronized (initialized) {
-            if (initialized) {
-                initialized = false;
-                AiravataAPI registry = (AiravataAPI) configurationcontext.getProperty(JCR_REGISTRY);
-                if(registry != null && thread != null){
-                    try {
-                        registry.getAiravataManager().unsetEventingURI();
-                    } catch (AiravataAPIInvocationException e) {
-                        e.printStackTrace();
-                    }
-                    thread.interrupt();
-                try {
-                    thread.join();
-                } catch (InterruptedException e) {
-                    log.info("Message box url update thread is interrupted");
-                }
-                }
-            }
-        }
-        log.info("broker shut down");
-    }
-
-    public void startUp(ConfigurationContext configContext, AxisService axisService) {
-    	AiravataUtils.setExecutionAsServer();
-        Boolean inited = (Boolean) configContext.getProperty(WsmgCommonConstants.BROKER_INITED);
-
-        if (inited == null || inited == false) {
-            log.info("Starting Message Broker...");
-            Axis2Utils.overrideAddressingPhaseHander(configContext, new PublishedMessageHandler());
-            WsmgConfigurationContext brokerConext = initConfigurations(configContext, axisService);
-            initQueue(brokerConext);
-            initDeliveryMethod(brokerConext.getConfigurationManager());
-
-            inited = true;
-            configContext.setProperty(WsmgCommonConstants.BROKER_INITED, inited);
-        } else {
-            log.debug("init was already done by another webservice");
-        }
-
-        final ConfigurationContext context = configContext;
-        synchronized (initialized) {
-            if (!initialized) {
-                initialized = true;
-                new Thread() {
-                    @Override
-                    public void run() {
-//                        Properties properties = new Properties();
-                        try {
-//                            URL url = this.getClass().getClassLoader()
-//                                    .getResource(REPOSITORY_PROPERTIES);
-//                            properties.load(url.openStream());
-//                            Map<String, String> map = new HashMap<String, String>(
-//                                    (Map) properties);
-                            try {
-                                Thread.sleep(JCR_AVAIALABILITY_WAIT_INTERVAL);
-                            } catch (InterruptedException e1) {
-                                e1.printStackTrace();
-                            }
-
-                            String userName = ServerSettings.getSystemUser();
-                            String gateway = ServerSettings.getSystemUserGateway();
-
-                            AiravataAPI airavataAPI = AiravataAPIFactory.getAPI(gateway, userName);
-                            String localAddress = ServiceUtils
-                                    .generateServiceURLFromConfigurationContext(
-                                            context,
-                                            MESSAGE_BROKER_SERVICE_NAME);
-                            log.debug("MESSAGE BOX SERVICE_ADDRESS:"
-                                    + localAddress);
-                            context.setProperty(SERVICE_URL, new URI(
-                                    localAddress));
-                            context.setProperty(JCR_REGISTRY, airavataAPI);
-                            /*
-                                    * Heart beat message to registry
-                                    */
-                            thread = new MsgBrokerURLRegisterThread(airavataAPI, context);
-                            thread.start();
-                        } catch (Exception e) {
-                            log.error(e.getMessage(), e);
-                        }
-                    }
-                }.start();
-            }
-        }
-    }
-
-    private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) {
-
-        WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
-        configContext.setProperty(WsmgCommonConstants.BROKER_WSMGCONFIG, wsmgConfig);
-
-        ConfigurationManager configMan = new ConfigurationManager();
-
-        wsmgConfig.setConfigurationManager(configMan);
-
-        String type = configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE,
-                WsmgCommonConstants.STORAGE_TYPE_PERSISTANT);
-
-        /*
-         * Determine Storage
-         */
-        if (WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY.equalsIgnoreCase(type)) {
-            WsmgInMemoryStorage inmem = new WsmgInMemoryStorage();
-
-            wsmgConfig.setStorage(inmem);
-            wsmgConfig.setQueue(inmem);
-            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, inmem));
-
-        } else {
-            String jdbcUrl = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL);
-            String jdbcDriver = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER);
-            WsmgPersistantStorage persis = new WsmgPersistantStorage(jdbcUrl, jdbcDriver);
-
-            wsmgConfig.setStorage(persis);
-            wsmgConfig.setQueue(persis);
-            wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, persis));
-        }
-
-        NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig);
-        wsmgConfig.setNotificationProcessor(notificatonProcessor);
-
-        return wsmgConfig;
-    }
-
-    private void initQueue(WsmgConfigurationContext context) {
-
-        log.debug("setting up queue");
-
-        WSMGParameter.OUT_GOING_QUEUE = context.getQueue();
-
-        if (WSMGParameter.cleanQueueonStartUp) {
-            log.debug("cleaning up persistant queue");
-            WSMGParameter.OUT_GOING_QUEUE.cleanup();
-            log.debug("cleaned up persistant queue");
-        }
-
-        RunTimeStatistics.setStartUpTime();
-
-    }
-
-    private void initDeliveryMethod(ConfigurationManager configMan) {
-
-        String shouldStart = configMan.getConfig(WsmgCommonConstants.CONFIG_START_DELIVERY_THREADS);
-
-        if (!Boolean.parseBoolean(shouldStart)) {
-
-            if (configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE,
-                    WsmgCommonConstants.STORAGE_TYPE_PERSISTANT).equalsIgnoreCase(
-                    WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY)) {
-
-                /*
-                 * user has asked to use in memory queue but without starting the delivery thread. this will accumulate
-                 * message in memory.
-                 */
-                log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth.");
-
-            }
-            return;
-        }
-
-        /*
-         * Create Protocol
-         */
-        DeliveryProtocol protocol;
-        String protocolClass = configMan
-                .getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL, Axis2Protocol.class.getName());
-        try {
-            Class cl = Class.forName(protocolClass);
-            Constructor<DeliveryProtocol> co = cl.getConstructor(null);
-            protocol = co.newInstance((Object[]) null);
-
-        } catch (Exception e) {
-            log.error("Cannot initial protocol sender", e);
-            return;
-        }
-        protocol.setTimeout(configMan.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, DEFAULT_SOCKET_TIME_OUT));
-
-        /*
-         * Create delivery method
-         */
-        SendingStrategy method = null;
-        String initedmethod = null;
-        String deliveryMethod = configMan.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD,
-                WsmgCommonConstants.DELIVERY_METHOD_SERIAL);
-        if (WsmgCommonConstants.DELIVERY_METHOD_PARALLEL.equalsIgnoreCase(deliveryMethod)) {
-            method = new ParallelSender();
-            initedmethod = WsmgCommonConstants.DELIVERY_METHOD_PARALLEL;
-
-        } else if (WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW.equalsIgnoreCase(deliveryMethod)) {
-            int poolsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
-                    WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
-            int batchsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
-                    WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
-            method = new FixedParallelSender(poolsize, batchsize);
-            initedmethod = WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW;
-
-        } else {
-            method = new SerialSender();
-            initedmethod = WsmgCommonConstants.DELIVERY_METHOD_SERIAL;
-        }
-
-        /*
-         * Create Deliverable
-         */
-        urlManager = new ConsumerUrlManager(configMan);
-        Deliverable senderUtils = new SenderUtils(urlManager);
-        senderUtils.setProtocol(protocol);
-
-        proc = new DeliveryProcessor(senderUtils, method);
-        proc.start();
-        log.debug(initedmethod + " sending method inited");
-    }
-
-    class MsgBrokerURLRegisterThread extends PeriodicExecutorThread {
-
-        private ConfigurationContext context = null;
-
-        public MsgBrokerURLRegisterThread(AiravataAPI registry, ConfigurationContext context) {
-            super(registry);
-            this.context = context;
-        }
-
-
-        protected void updateRegistry(AiravataAPI registry) {
-            try {
-                URI localAddress = (URI) this.context.getProperty(SERVICE_URL);
-                registry.getAiravataManager().setEventingURI(localAddress);
-            } catch (AiravataAPIInvocationException e) {
-                e.printStackTrace();
-            }
-            log.debug("Updated Eventing service URL in to Repository");
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
deleted file mode 100644
index d087a87..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.io.Serializable;
-
-public class ConsumerInfo implements Serializable {
-    static final long serialVersionUID = 2274650724788817903L;
-
-    // TODO : change this to OM, EndpointReference.
-    String consumerEprStr;
-
-    String type; // Either "wsnt" or "wse"
-
-    boolean useNotify;
-
-    boolean paused = false;
-
-    boolean wsrmEnabled;
-
-    /**
-     * @param consumerEprStr
-     * @param type
-     * @param useNotify
-     * @param paused
-     */
-    public ConsumerInfo(String consumerEprStr, String type, boolean useNotify, boolean paused) {
-        super();
-        // TODO Auto-generated constructor stub
-        this.consumerEprStr = consumerEprStr;
-        this.type = type;
-        this.useNotify = useNotify;
-        this.paused = paused;
-    }
-
-    /**
-     * @return Returns the consumerEprStr.
-     */
-    public String getConsumerEprStr() {
-        return consumerEprStr;
-    }
-
-    /**
-     * @param consumerEprStr
-     *            The consumerEprStr to set.
-     */
-    public void setConsumerEprStr(String consumerEprStr) {
-        this.consumerEprStr = consumerEprStr;
-    }
-
-    /**
-     * @return Returns the paused.
-     */
-    public boolean isPaused() {
-        return paused;
-    }
-
-    /**
-     * @param paused
-     *            The paused to set.
-     */
-    public void setPaused(boolean paused) {
-        this.paused = paused;
-    }
-
-    /**
-     * @return Returns the type.
-     */
-    public String getType() {
-        return type;
-    }
-
-    /**
-     * @param type
-     *            The type to set.
-     */
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    /**
-     * @return Returns the useNotify.
-     */
-    public boolean isUseNotify() {
-        return useNotify;
-    }
-
-    /**
-     * @param useNotify
-     *            The useNotify to set.
-     */
-    public void setUseNotify(boolean useNotify) {
-        this.useNotify = useNotify;
-    }
-
-    public boolean isWsrmEnabled() {
-        return wsrmEnabled;
-    }
-
-    public void setWsrmEnabled(boolean wsrmEnabled) {
-        this.wsrmEnabled = wsrmEnabled;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
deleted file mode 100644
index 38d8a3a..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ConsumerList {
-    Map<String, ConsumerInfo> subId2ConsumerInfo = new HashMap<String, ConsumerInfo>();
-
-    ArrayList<ConsumerInfo> consumerInfoList = new ArrayList<ConsumerInfo>();
-
-    boolean changed = true;
-    int size = 0; // use size instead of consumerInfoList.size() to avoid
-
-    // unnecessary copy of subId2ConsumerInfo in
-    // refreshConsumerInfoList() if just need the size
-
-    public void addConsumer(String subId, ConsumerInfo consumerInfo) {
-        subId2ConsumerInfo.put(subId, consumerInfo);
-        changed = true;
-        size++;
-    }
-
-    public int removeConsumer(String subId) {
-        ConsumerInfo result = subId2ConsumerInfo.remove(subId);
-        if (result == null) {
-            return 0;
-        }
-        changed = true;
-        size--;
-        return 1;
-
-    }
-
-    public ArrayList<ConsumerInfo> getConsumerList() {
-        if (changed) {
-            refreshConsumerInfoList();
-        }
-        return consumerInfoList;
-    }
-
-    public int size() {
-        return size;
-    }
-
-    private void refreshConsumerInfoList() {
-        consumerInfoList.clear();
-        consumerInfoList.addAll(subId2ConsumerInfo.values());
-        changed = false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
deleted file mode 100644
index b5ca1bb..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumerListManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ConsumerListManager.class);
-
-    protected Map<String, ConsumerList> token2ConsumerListMap = new HashMap<String, ConsumerList>();
-
-    protected Map<String, String> subId2Token = new HashMap<String, String>();
-
-    // token can be a topic or an XPath String
-    public void addToConsumerList(String token, SubscriptionState subscribeRequest, String subscriptionId) {
-        ConsumerList consumerList = token2ConsumerListMap.get(token);
-        if (consumerList == null) { // new topic
-            consumerList = new ConsumerList();
-            token2ConsumerListMap.put(token, consumerList);
-        }
-        consumerList.addConsumer(subscriptionId, subscribeRequest.getConsumerInfo());
-        subId2Token.put(subscriptionId, token);
-
-    }
-
-    public String getTokenBySubscriptionId(String subscriptionId) {
-        String token = subId2Token.get(subscriptionId);
-        return token;
-    }
-
-    public int removeFromConsumerList(String subscriptionId, String token) {
-        String tokenString = null;
-        if (token == null) {
-            tokenString = subId2Token.get(subscriptionId);
-        } else {
-            tokenString = token;
-        }
-
-        ConsumerList consumerList = token2ConsumerListMap.get(tokenString);
-        if (consumerList == null) {
-            logger.error("*****ERROR:Cannot find the token to delete: " + tokenString);
-            return 0;
-        }
-        int result = consumerList.removeConsumer(subscriptionId);
-        subId2Token.remove(subscriptionId);
-        return result;
-    }
-
-    public ConsumerList getConsumerListByToken(String token) {
-        ConsumerList consumerList = token2ConsumerListMap.get(token);
-        return consumerList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
deleted file mode 100644
index 10ae03b..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker;
-
-import java.util.*;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor;
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.om.xpath.AXIOMXPath;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NotificationProcessor {
-
-    private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
-
-    private WsmgConfigurationContext wsmgConfigContext;
-
-    protected long messageCounter = 0;
-    protected long messageId = 0;
-
-    OMFactory factory = OMAbstractFactory.getOMFactory();
-
-    private OutGoingQueue outgoingQueue;
-
-    private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor();
-
-    public NotificationProcessor(WsmgConfigurationContext config) {
-        init(config);
-        amqpNotificationProcessor.init();
-    }
-
-    private void init(WsmgConfigurationContext config) {
-        this.wsmgConfigContext = config;
-        outgoingQueue = config.getOutgoingQueue();
-    }
-
-    private synchronized long getNextTrackId() {
-        messageCounter++;
-        return messageCounter;
-    }
-
-    private synchronized long getNextMsgId() {
-        messageId++;
-        return messageId;
-    }
-
-    public void processMsg(ProcessingContext ctx, OMNamespace protocolNs) throws OMException, AxisFault {
-
-        String trackId = "trackId_A_" + getNextTrackId();
-        if (WSMGParameter.showTrackId) {
-            logger.debug(trackId + ": received.");
-        }
-
-        AdditionalMessageContent additionalMessageContent = new AdditionalMessageContent(ctx.getMessageContext()
-                .getSoapAction(), ctx.getMessageContext().getMessageID());
-        additionalMessageContent.setTrackId(trackId);
-
-        handleExtendedNotifications(ctx, protocolNs);
-
-        if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
-
-            onWSNTMsg(ctx, additionalMessageContent);
-            setResponseMsg(ctx, trackId, protocolNs);
-        } else { // WSE Notifications No specific namespace
-
-            onWSEMsg(ctx, trackId, additionalMessageContent);
-           setResponseMsg(ctx, trackId, protocolNs);
-        }
-    }
-
-    /**
-     * @param ctx
-     * @param topicElString
-     * @param trackId
-     * @param additionalMessageContent
-     * @throws OMException
-     * @throws XMLStreamException
-     */
-    private void onWSEMsg(ProcessingContext ctx, String trackId, AdditionalMessageContent additionalMessageContent)
-            throws OMException, AxisFault {
-
-        String topicElString = null;
-        String topicLocalString = null;
-
-        QName qName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Topic");
-
-        OMElement topicEl = ctx.getMessageContext().getEnvelope().getHeader().getFirstChildWithName(qName);
-
-        if (topicEl == null) {
-
-            topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
-
-            if (topicLocalString != null) {
-
-                topicElString = "<wsnt:Topic "
-                        + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
-                        + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
-                        + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">" + "ns2:"
-                        + topicLocalString + "</wsnt:Topic>";
-                // / }
-                additionalMessageContent.setTopicElement(topicElString);
-            } else {
-
-                topicLocalString = "wseTopic";
-                topicElString = "<wsnt:Topic "
-                        + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
-                        + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
-                        + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">"
-                        + "ns2:wseTopic</wsnt:Topic>";
-                // / }
-                additionalMessageContent.setTopicElement(topicElString);
-            }
-        } else {
-
-            topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
-            try {
-                topicElString = topicEl.toStringWithConsume();
-            } catch (XMLStreamException e) {
-                logger.error("exceptions occured at WSE eventing notification creating", e);
-            }
-            additionalMessageContent.setTopicElement(topicElString);
-        }
-
-        OMElement messageEl = ctx.getSoapBody().getFirstElement();
-        if (messageEl == null) {
-            throw new AxisFault("no message found");
-        }
-
-        String message = null;
-        try {
-            message = messageEl.toStringWithConsume();
-        } catch (XMLStreamException e) {
-            logger.error("unable to serialize the message", e);
-            throw new AxisFault("unable to serialize the message", e);
-        }
-
-        matchAndSave(message, topicLocalString, additionalMessageContent);
-    }
-
-    /**
-     * @param ctx
-     * @param trackId
-     * @throws OMException
-     */
-    private void setResponseMsg(ProcessingContext ctx, String trackId, OMNamespace responseNS) throws OMException {
-        // set response message
-
-        ctx.addResponseMsgNameSpaces(responseNS);
-
-        OMAttribute trackIdAttribute = factory.createOMAttribute("trackId", null, trackId);
-        OMElement messageElement = ctx.getMessageContext().getEnvelope().getBody().getFirstElement();
-        OMElement responseMsgElement = factory.createOMElement(messageElement.getLocalName() + "Response", responseNS);
-        responseMsgElement.addAttribute(trackIdAttribute);
-        ctx.setRespMessage(responseMsgElement);
-
-    }
-
-    /**
-     * @param ctx
-     * @param topicLocalString
-     * @param topicElString
-     * @param producerReferenceElString
-     * @param additionalMessageContent
-     * @throws OMException
-     * @throws XMLStreamException
-     * @throws AxisFault
-     */
-    private void onWSNTMsg(ProcessingContext ctx, AdditionalMessageContent additionalMessageContent)
-            throws OMException, AxisFault {
-
-        String producerReferenceElString = null;
-        String topicElString = null;
-
-        boolean noElements = true;
-
-        // TODO: set nicely with a processing context
-        OMElement notifyEl = ctx.getSoapBody().getFirstElement();
-        for (Iterator<OMElement> iter = notifyEl.getChildrenWithLocalName("NotificationMessage"); iter.hasNext();) {
-            noElements = false;
-            OMElement wrappedMessageEl = iter.next();
-
-            String topicLocalString = null;
-
-            OMElement topicEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                    .getNamespaceURI(), "Topic"));
-            if (topicEl != null) {
-
-                topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText()); // get what ever inside this
-                                                                                      // element
-
-                try {
-                    topicElString = topicEl.toStringWithConsume();
-                } catch (XMLStreamException e) {
-                    logger.error("exception occured while creating NotificationConsumer", e);
-                }
-                additionalMessageContent.setTopicElement(topicElString);
-            }
-            OMElement producerReferenceEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                    .getNamespaceURI(), "ProducerReference"));
-
-            if (producerReferenceEl != null) {
-                try {
-                    producerReferenceElString = producerReferenceEl.toStringWithConsume();
-                } catch (XMLStreamException e) {
-                    logger.error("exception occured while creating notification consumer", e);
-
-                }
-                additionalMessageContent.setProducerReference(producerReferenceElString);
-            }
-
-            OMElement notificationMessageEl = wrappedMessageEl.getFirstChildWithName(
-                    new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
-
-            String message = null;
-            try {
-                message = notificationMessageEl.toStringWithConsume();
-            } catch (XMLStreamException e) {
-                logger.error("exception occured while creating notification consumer", e);
-                throw new AxisFault("unable to serialize the message", e);
-            }
-
-            matchAndSave(message, topicLocalString, additionalMessageContent);
-
-        }
-        if (noElements) {
-            throw new AxisFault("at least one element is required");
-        }
-    }
-
-    private void matchAndSave(String notificationMessage, String topicLocalString,
-            AdditionalMessageContent additionalMessageContent) {
-
-        List<ConsumerInfo> matchedConsumers = new LinkedList<ConsumerInfo>();
-
-        // not use incoming queue
-        // This is a fix for the bug seen in yfilter.
-        try {
-
-            for (AbstractMessageMatcher matcher : wsmgConfigContext.getMessageMatchers()) {
-                matcher.populateMatches(null, additionalMessageContent, notificationMessage, topicLocalString,
-                        matchedConsumers);
-            }
-
-            save(matchedConsumers, notificationMessage, additionalMessageContent);
-
-        } catch (RuntimeException e) {
-            logger.error("Caught RuntimeException", e);
-        }
-
-    }
-
-    public void save(List<ConsumerInfo> consumerInfoList, String message,
-            AdditionalMessageContent additionalMessageContent) {
-
-        if (consumerInfoList.size() == 0) // No subscription
-            return;
-
-        RunTimeStatistics.addNewNotificationMessageSize(message.length());
-        OutGoingMessage outGoingMessage = new OutGoingMessage();
-        outGoingMessage.setTextMessage(message);
-        outGoingMessage.setConsumerInfoList(consumerInfoList);
-        outGoingMessage.setAdditionalMessageContent(additionalMessageContent);
-
-        outgoingQueue.storeNotification(outGoingMessage, getNextMsgId());
-
-        if (WSMGParameter.showTrackId)
-            logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue.");
-    }
-
-    private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
-        // AMQP
-        amqpNotificationProcessor.notify(ctx, protocolNs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
deleted file mode 100644
index 39970ec..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.amqp;
-
-import org.apache.airavata.common.utils.ApplicationSettings;
-import org.apache.airavata.wsmg.client.amqp.*;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMNamespace;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Element;
-
-import javax.xml.namespace.QName;
-
-/**
- * AMQPNotificationProcessor handles AMQP-specific notification processing.
- */
-public class AMQPNotificationProcessor {
-
-    private static final Logger logger = LoggerFactory.getLogger(AMQPNotificationProcessor.class);
-    
-    private boolean amqpEnabled = false;
-    private AMQPSender amqpSender = null;
-    private AMQPTopicSender amqpTopicSender = null;
-    private AMQPBroadcastSender amqpBroadcastSender = null;
-
-    public void init() {
-        String amqpEnabledAppSetting = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_ENABLE, "");
-        if (!amqpEnabledAppSetting.isEmpty() && (1 == Integer.parseInt(amqpEnabledAppSetting))) {
-            try {
-                String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
-                String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
-                String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
-                String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
-
-                Properties properties = new Properties();
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
-                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
-
-                String className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_SENDER, "");
-                Class clazz = Class.forName(className);
-                amqpSender = (AMQPSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
-                className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_TOPIC_SENDER, "");
-                clazz = Class.forName(className);
-                amqpTopicSender = (AMQPTopicSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
-                className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_BROADCAST_SENDER, "");
-                clazz = Class.forName(className);
-                amqpBroadcastSender = (AMQPBroadcastSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
-
-                Element routingKeys = AMQPUtil.loadRoutingKeys();
-                if (routingKeys != null) {
-                    ((AMQPRoutingAwareClient)amqpSender).init(routingKeys);
-                    ((AMQPRoutingAwareClient)amqpTopicSender).init(routingKeys);
-                    ((AMQPRoutingAwareClient)amqpBroadcastSender).init(routingKeys);
-                }
-
-                amqpEnabled = true;
-            } catch (Exception ex) {
-                logger.error(ex.getMessage());
-            }
-        }
-    }
-
-    public void notify(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
-        if (amqpEnabled) {
-            // Extract messages
-            List<OMElement> messages = new ArrayList<OMElement>();
-            if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
-                // WSNT
-                OMElement messageElements = ctx.getSoapBody().getFirstElement();
-                for (Iterator<OMElement> ite = messageElements.getChildrenWithLocalName("NotificationMessage"); ite.hasNext(); ) {
-                    try {
-                        OMElement messageElement = ite.next();
-                        OMElement message = messageElement.getFirstChildWithName(
-                                new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
-                        messages.add(message);
-                    } catch (NullPointerException e) {
-                        throw new OMException(e);
-                    }
-                }
-            } else {
-                // WSE
-                OMElement message = ctx.getSoapBody().getFirstElement();
-                if (message != null) {
-                    messages.add(message);
-                }
-            }
-
-            // Dispatch messages
-            try {
-                for (OMElement message : messages) {
-                    amqpBroadcastSender.Send(message);
-                    amqpTopicSender.Send(message);
-                    amqpSender.Send(message);
-                }
-            } catch (AMQPException e) {
-                logger.warn("Failed to send AMQP notification.[Reason=" + e.getMessage() + "]");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
deleted file mode 100644
index 8c097d4..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-
-public class ContextParameterInfo<T> {
-
-    private Class<T> parameterType;
-    private String parameterName;
-
-    public ContextParameterInfo(Class<T> type, String name) {
-        parameterType = type;
-        parameterName = name;
-
-    }
-
-    public Class<T> getParameterType() {
-        return parameterType;
-    }
-
-    public String getParameterName() {
-        return parameterName;
-    }
-
-    public T cast(Object obj) {
-
-        return parameterType.cast(obj);
-    }
-
-    public static void main(String[] a) {
-
-        new ContextParameterInfo<OMElement>(OMElement.class, "test");
-        OMAbstractFactory.getOMFactory().createOMElement("testtest", null);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
deleted file mode 100644
index 61624d8..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.addressing.EndpointReference;
-
-public class ContextParameters {
-
-    private static <V> ContextParameterInfo<V> createParam(Class<V> c, String name) {
-        ContextParameterInfo<V> info = new ContextParameterInfo<V>(c, name);
-
-        return info;
-    }
-
-    public static ContextParameterInfo<String> RESOURCE_ID = createParam(String.class, "resourceID");
-
-    public static final ContextParameterInfo<String> SUB_ID = createParam(String.class, "subID");
-
-    public static final ContextParameterInfo<String> TOPIC_FROM_URL = createParam(String.class, "topicFromUrl");
-
-    public static final ContextParameterInfo<String> SOAP_ACTION = createParam(String.class, "soapAction");
-
-    public static final ContextParameterInfo<SubscriptionState> SUBSCRIPTION = createParam(SubscriptionState.class,
-            "subscription");
-
-    public static final ContextParameterInfo<String> SUBSCRIBER_EXPIRES = createParam(String.class, "subscriberExpires");
-
-    public ContextParameterInfo<String> USE_NOTIFY_TEXT = createParam(String.class, "useNotifyText");
-
-    public static final ContextParameterInfo<OMElement> USE_NOTIFY_ELEMENT = createParam(OMElement.class, "useNotifyEl");
-
-    public static final ContextParameterInfo<OMElement> NOTIFY_TO_ELEMENT = createParam(OMElement.class, "NotifyTo");
-
-    public static final ContextParameterInfo<EndpointReference> NOTIFY_TO_EPR = createParam(EndpointReference.class,
-            "NotifyToEPR");
-
-    public static final ContextParameterInfo<OMElement> SUB_POLICY = createParam(OMElement.class, "subPolicy");
-
-    public static final ContextParameterInfo<OMElement> FILTER_ELEMENT = createParam(OMElement.class, "filterElement");
-
-    public static final ContextParameterInfo<OMElement> TOPIC_EXPRESSION_ELEMENT = createParam(OMElement.class,
-            "topicExpressionEl");
-
-    public static final ContextParameterInfo<OMElement> XPATH_ELEMENT = createParam(OMElement.class, "xpathEl");
-
-    public static final ContextParameterInfo<OMElement> SUBSCRIBE_ELEMENT = createParam(OMElement.class, "subscribeElement");
-
-    public static final ContextParameterInfo<EndpointReference> SUBSCRIBE_ELEMENT_EPR = createParam(EndpointReference.class,
-            "subscribeElement");
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
deleted file mode 100644
index 06a50ca..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.soap.SOAPBody;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.context.MessageContext;
-
-public class ProcessingContext {
-
-    private Map<ContextParameterInfo<? extends Object>, Object> contextInfo = new HashMap<ContextParameterInfo<? extends Object>, Object>();
-
-    private List<OMNamespace> responseMsgNameSpaces;
-
-    private MessageContext messageContext = null;
-
-    private SOAPEnvelope envelope; // Used for WSe notification messages.topics
-    // are
-    // in header.
-
-    private OMElement respMessage;
-
-    private SubscriptionState subscription = null;
-
-    public SOAPEnvelope getEnvelope() {
-        return envelope;
-    }
-
-    public void setEnvelope(SOAPEnvelope envelope) {
-        this.envelope = envelope;
-    }
-
-    public SOAPBody getSoapBody() {
-
-        return envelope.getBody();
-    }
-
-    public OMElement getRespMessage() {
-        return respMessage;
-    }
-
-    public void setRespMessage(OMElement respMessage) {
-        this.respMessage = respMessage;
-    }
-
-    public SubscriptionState getSubscription() {
-        return subscription;
-    }
-
-    public void setSubscription(SubscriptionState subscription) {
-        this.subscription = subscription;
-    }
-
-    public void setMessageConext(MessageContext msgContext) {
-        this.messageContext = msgContext;
-    }
-
-    public MessageContext getMessageContext() {
-        return messageContext;
-    }
-
-    public void addResponseMsgNameSpaces(OMNamespace ns) {
-
-        if (responseMsgNameSpaces == null) {
-            responseMsgNameSpaces = new ArrayList<OMNamespace>();
-        }
-
-        if (!responseMsgNameSpaces.contains(ns)) {
-            responseMsgNameSpaces.add(ns);
-        }
-    }
-
-    public List<OMNamespace> getResponseMsgNamespaces() {
-        return responseMsgNameSpaces;
-    }
-
-    public void setContextParameter(ContextParameterInfo<?> name, Object value) {
-        contextInfo.put(name, value);
-    }
-
-    public <T> T getContextParameter(ContextParameterInfo<T> name) {
-
-        Object o = contextInfo.get(name);
-        return name.cast(o);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
deleted file mode 100644
index 9d4d4ea..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.context;
-
-import org.apache.axiom.om.OMElement;
-
-public abstract class ProcessingContextBuilder {
-
-    public abstract ProcessingContext build(OMElement elem);
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
deleted file mode 100644
index ca23506..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.handler;
-
-import java.util.List;
-
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingFaultsHelper;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
-import org.apache.axis2.engine.Phase;
-import org.apache.axis2.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PublishedMessageHandler extends AddressingBasedDispatcher {
-
-    private static final Logger logger = LoggerFactory.getLogger(PublishedMessageHandler.class);
-
-    private static final String ADDRESSING_VALIDATE_ACTION = "addressing.validateAction";
-
-    private AxisOperation publishOperation = null;
-
-    private Phase addressingPhase = null;
-
-    public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
-
-        InvocationResponse response = InvocationResponse.CONTINUE;
-
-        if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null) {
-            boolean validateAction = JavaUtils.isTrue(msgContext.getProperty(ADDRESSING_VALIDATE_ACTION), true);
-            msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(false));
-            response = super.invoke(msgContext);
-            if (isForBrokerEventingService(msgContext))
-                validateBrokerWSEventingOperation(msgContext);
-            if (validateAction)
-                checkAction(msgContext);
-            msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(validateAction));
-
-        }
-
-        return response;
-    }
-
-    private void validateBrokerWSEventingOperation(MessageContext msgContext) {
-        if (msgContext.getAxisOperation() == null) {
-            AxisService service = msgContext.getAxisService();
-            AxisOperation pubOperation = getPublishOperation(service);
-            msgContext.setAxisOperation(pubOperation);
-        }
-    }
-
-    private boolean isForBrokerEventingService(MessageContext msgContext) {
-        return msgContext.getAxisService() != null && msgContext.getAxisService().getName().equals("EventingService");
-    }
-
-    private AxisOperation getPublishOperation(AxisService publisherService) {
-        if (publishOperation == null)
-            publishOperation = publisherService.getOperationBySOAPAction(WsmgCommonConstants.WSMG_PUBLISH_SOAP_ACTION);
-        return publishOperation;
-    }
-
-    private Phase getAddressingPhase(MessageContext context) {
-
-        if (addressingPhase == null) {
-
-            List<Phase> inFlowPhases = context.getConfigurationContext().getAxisConfiguration().getPhasesInfo()
-                    .getINPhases();
-
-            for (Phase p : inFlowPhases) {
-                if (p.getName().equalsIgnoreCase("Addressing")) {
-                    addressingPhase = p;
-                }
-            }
-
-        }
-
-        return addressingPhase;
-
-    }
-
-    private void checkAction(MessageContext msgContext) throws AxisFault {
-
-        Phase addPhase = getAddressingPhase(msgContext);
-
-        if (addPhase == null) {
-            logger.error("unable to locate addressing phase object");
-        }
-        if (msgContext != null) {
-            if (msgContext.getCurrentPhaseIndex() + 1 == addPhase.getHandlerCount()) {
-                if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null)
-                    AddressingFaultsHelper.triggerActionNotSupportedFault(msgContext, msgContext.getWSAAction());
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
deleted file mode 100644
index bcd4ec1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.airavata.wsmg.commons.CommonRoutines;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.axis2.AxisFault;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class CleanUpThread implements Runnable {
-
-    private static final Logger logger = LoggerFactory.getLogger(CleanUpThread.class);
-
-    private SubscriptionManager subMan;
-
-    public CleanUpThread(SubscriptionManager manager) {
-        this.subMan = manager;
-    }
-
-    public void run() {
-        logger.debug("CleanUpThread started");
-        String key = null;
-        SubscriptionState subscription = null;
-        Set<String> keySet = null;
-        // long expirationTime=300000*12*24; //5 min*12*24=1 day
-        final long expirationTime = WSMGParameter.expirationTime;
-        final long skipCheckInterval = 1000 * 60 * 10; // 10 minutes
-        final long checkupInterval = 1000 * 60 * 5; // 5 minutes
-        int MAX_TRY = 3;
-        logger.info("Starting Subscription Cleaning up Thread.");
-        while (true) {
-            long currentTime = System.currentTimeMillis();
-            long expiredStartTime = 0;
-            if (WSMGParameter.requireSubscriptionRenew) {
-                expiredStartTime = currentTime - expirationTime; // expired
-            }
-            long availabilityCheckTime = 0;
-            availabilityCheckTime = currentTime - skipCheckInterval; // It's
-            // time
-            // to
-            // check
-            // again
-
-            // logger.finest("CleanUpThread loop");
-            keySet = subMan.getShallowSubscriptionsCopy().keySet();
-            // Go through all the subscriptions and delete expired ones
-            for (Iterator<String> iterator = keySet.iterator(); iterator.hasNext();) {
-                key = iterator.next();
-                subscription = subMan.getShallowSubscriptionsCopy().get(key);
-                if (subscription.isNeverExpire()) {
-                    continue;
-                }
-                long subscriptionCreationTime = subscription.getCreationTime();
-                long lastAvailableTime = subscription.getLastAvailableTime();
-                if (WSMGParameter.requireSubscriptionRenew) { // expired
-                    if (subscriptionCreationTime < expiredStartTime) { // expired
-                        // or
-                        // need
-                        // to
-                        // check
-                        // again
-                        try {
-                            subMan.removeSubscription(key);
-                        } catch (AxisFault e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                        // Not need to remove the key from the keyset since
-                        // the keyset
-                        // "is backed by the map, so changes to the map are reflected in the set, and vice-versa."
-                        // i.remove(); //add this will cause
-                        // ConcurrentModificationException
-                        logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr()
-                                + "##" + subscription.getLocalTopic());
-                        logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr()
-                                + "##" + subscription.getLocalTopic());
-                        continue;
-                    }
-                }
-                if (lastAvailableTime < availabilityCheckTime) {
-                    // It's time to check again
-                    if (CommonRoutines.isAvailable(subscription.getConsumerAddressURI())) {
-                        // It's time to check but still available and do not
-                        // require subscriptio renew
-                        // set a mark saying it has been check at this time
-                        subscription.setLastAvailableTime(currentTime);
-                        if (subscription.getUnAvailableCounter() > 0) { // failed
-                            // in
-                            // previous
-                            // try
-                            subscription.resetUnAvailableCounter();
-                        }
-                    } else {
-                        int counter = subscription.addUnAvailableCounter();
-                        // System.out.println("UnavailableCounter="+counter);
-                        // logger.finest("UnavailableCounter="+counter);
-                        if (counter > MAX_TRY) {
-                            try {
-                                subMan.removeSubscription(key);
-                            } catch (AxisFault e) {
-                                // TODO Auto-generated catch block
-                                logger.error(e.getMessage(), e);
-                            }
-
-                            // Remove from hashtable seperately to avoid
-                            // conccurent access problem to the hashtable
-                            // with
-                            // i.next()
-                            iterator.remove();
-                            logger.info("*****Deleted (unavailable)" + key + "-->"
-                                    + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic());
-                            logger.info("*****Deleted (unavailable)" + key + "-->"
-                                    + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic());
-                        }
-                    }
-                }
-            }
-            try {
-                Thread.sleep(checkupInterval);
-            } catch (InterruptedException e) {
-                logger.error("thread was interrupped", e);
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
deleted file mode 100644
index a9339fd..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-public class SubscriptionEntry {
-
-    private String subscriptionId;
-
-    private String subscribeXml;
-
-    public SubscriptionEntry() {
-    }
-
-    public String getSubscriptionId() {
-        return subscriptionId;
-    }
-
-    public String getSubscribeXml() {
-        return subscribeXml;
-    }
-
-    public void setSubscriptionId(String subscriptionId) {
-        this.subscriptionId = subscriptionId;
-    }
-
-    public void setSubscribeXml(String subscribeXml) {
-        this.subscribeXml = subscribeXml;
-    }
-
-}


Mime
View raw message