karaf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ioca...@apache.org
Subject svn commit: r1164077 - in /karaf/cellar/trunk: core/src/main/java/org/apache/karaf/cellar/core/event/ dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/ hazelcast/src/main/resources/META-INF/sp...
Date Thu, 01 Sep 2011 13:37:56 GMT
Author: iocanel
Date: Thu Sep  1 13:37:55 2011
New Revision: 1164077

URL: http://svn.apache.org/viewvc?rev=1164077&view=rev
Log:
[KARAF-843] ImportServiceListener, ExportServiceListener and GroupManager now properly cache
consumers & producers and properly remove them when no longer needed"

Modified:
    karaf/cellar/trunk/core/src/main/java/org/apache/karaf/cellar/core/event/EventConsumer.java
    karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java
    karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/TopicConsumer.java
    karaf/cellar/trunk/hazelcast/src/main/resources/META-INF/spring/beans.xml

Modified: karaf/cellar/trunk/core/src/main/java/org/apache/karaf/cellar/core/event/EventConsumer.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/core/src/main/java/org/apache/karaf/cellar/core/event/EventConsumer.java?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/core/src/main/java/org/apache/karaf/cellar/core/event/EventConsumer.java
(original)
+++ karaf/cellar/trunk/core/src/main/java/org/apache/karaf/cellar/core/event/EventConsumer.java
Thu Sep  1 13:37:55 2011
@@ -37,4 +37,9 @@ public interface EventConsumer<E extends
      */
     public void stop();
 
+    /**
+     * Returns true if Consumer is consuming
+     */
+    public Boolean isConsuming();
+
 }

Modified: karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java
(original)
+++ karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java
Thu Sep  1 13:37:55 2011
@@ -71,6 +71,11 @@ public class ExportServiceListener imple
 
     public void destroy() {
         bundleContext.removeServiceListener(this);
+            for(Map.Entry<String,EventConsumer> consumerEntry:consumers.entrySet())
{
+                EventConsumer consumer = consumerEntry.getValue();
+                consumer.stop();
+            }
+            consumers.clear();
     }
 
 
@@ -127,9 +132,13 @@ public class ExportServiceListener imple
                     remoteEndpoints.put(endpointId, endpoint);
 
                     //Register the endpoint consumer
-                    EventConsumer consumer = eventTransportFactory.getEventConsumer(Constants.INTERFACE_PREFIX
+ Constants.SEPARATOR + endpointId, false);
-                    consumers.put(endpointId, consumer);
-
+                    EventConsumer consumer = consumers.get(endpointId);
+                    if(consumer == null) {
+                        consumer = eventTransportFactory.getEventConsumer(Constants.INTERFACE_PREFIX
+ Constants.SEPARATOR + endpointId, false);
+                        consumers.put(endpointId, consumer);
+                    } else if(!consumer.isConsuming()) {
+                        consumer.start();
+                    }
                 }
             }
         } finally {

Modified: karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java
(original)
+++ karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java
Thu Sep  1 13:37:55 2011
@@ -62,6 +62,13 @@ public class ImportServiceListener imple
             ServiceRegistration registration = entry.getValue();
             registration.unregister();
         }
+        for (Map.Entry<String, EventConsumer> consumerEntry : consumers.entrySet())
{
+            EventConsumer consumer = consumerEntry.getValue();
+            consumer.stop();
+        }
+        consumers.clear();
+        producers.clear();
+
     }
 
     @Override
@@ -134,8 +141,19 @@ public class ImportServiceListener imple
     private void importService(EndpointDescription endpoint, ListenerInfo listenerInfo) {
         LOGGER.info("CELLAR DOSGI EVENT: Importing remote service.");
 
-        EventProducer requestProducer = eventTransportFactory.getEventProducer(Constants.INTERFACE_PREFIX
+ Constants.SEPARATOR + endpoint.getId(),Boolean.FALSE);
-        EventConsumer resultConsumer = eventTransportFactory.getEventConsumer(Constants.RESULT_PREFIX
+ Constants.SEPARATOR + clusterManager.getNode().getId() + endpoint.getId(), Boolean.FALSE);
+        EventProducer requestProducer = producers.get(endpoint.getId());
+        if(requestProducer == null) {
+            requestProducer = eventTransportFactory.getEventProducer(Constants.INTERFACE_PREFIX
+ Constants.SEPARATOR + endpoint.getId(),Boolean.FALSE);
+            producers.put(endpoint.getId(),requestProducer);
+        }
+
+        EventConsumer resultConsumer = consumers.get(endpoint.getId());
+        if(resultConsumer == null) {
+            resultConsumer = eventTransportFactory.getEventConsumer(Constants.RESULT_PREFIX
+ Constants.SEPARATOR + clusterManager.getNode().getId() + endpoint.getId(), Boolean.FALSE);
+            consumers.put(endpoint.getId(),resultConsumer);
+        } else if(!resultConsumer.isConsuming()) {
+            resultConsumer.start();
+        }
 
         producers.put(endpoint.getId(),requestProducer);
         consumers.put(endpoint.getId(),resultConsumer);

Modified: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
(original)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
Thu Sep  1 13:37:55 2011
@@ -23,8 +23,6 @@ import org.apache.karaf.cellar.core.even
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * @author: iocanel
@@ -33,76 +31,45 @@ public class HazelcastEventTransportFact
 
     private Dispatcher dispatcher;
 
-    private Map<String, QueueConsumer> queueConsumerMap = new HashMap<String, QueueConsumer>();
-    private Map<String, QueueProducer> queueProducerMap = new HashMap<String, QueueProducer>();
-
-    private Map<String, TopicConsumer> topicConsumerMap = new HashMap<String, TopicConsumer>();
-    private Map<String, TopicProducer> topicProducerMap = new HashMap<String, TopicProducer>();
-
 
     @Override
     public EventProducer getEventProducer(String name, Boolean pubsub) {
         if (pubsub) {
-            TopicProducer producer = topicProducerMap.get(name);
-            if (producer != null) {
-                return producer;
-            } else {
                 ITopic topic = instance.getTopic(Constants.TOPIC + Constants.SEPARATOR +
name);
-                producer = new TopicProducer();
+                TopicProducer producer = new TopicProducer();
                 producer.setTopic(topic);
                 producer.setNode(getNode());
                 producer.init();
-                topicProducerMap.put(name, producer);
                 return producer;
-            }
         } else {
-            QueueProducer producer = queueProducerMap.get(name);
-            if (producer != null) {
-                return producer;
-            } else {
                 IQueue queue = instance.getQueue(Constants.QUEUE + Constants.SEPARATOR +
name);
-                producer = new QueueProducer();
+                QueueProducer producer = new QueueProducer();
                 producer.setQueue(queue);
                 producer.setNode(getNode());
                 producer.init();
-                queueProducerMap.put(name, producer);
                 return producer;
-            }
         }
     }
 
     @Override
     public EventConsumer getEventConsumer(String name, Boolean pubsub) {
         if (pubsub) {
-            TopicConsumer consumer = topicConsumerMap.get(name);
-            if (consumer != null) {
-                return consumer;
-            } else {
-
                 ITopic topic = instance.getTopic(Constants.TOPIC + Constants.SEPARATOR +
name);
-                consumer = new TopicConsumer();
+                TopicConsumer consumer = new TopicConsumer();
                 consumer.setTopic(topic);
                 consumer.setNode(getNode());
                 consumer.setDispatcher(dispatcher);
                 consumer.init();
-                topicConsumerMap.put(name, consumer);
                 return consumer;
-            }
         } else {
-            QueueConsumer consumer = queueConsumerMap.get(name);
-            if (consumer != null) {
-                return consumer;
-            } else {
+
                 IQueue queue = instance.getQueue(Constants.QUEUE + Constants.SEPARATOR +
name);
-                consumer = new QueueConsumer();
+                QueueConsumer consumer = new QueueConsumer();
                 consumer.setQueue(queue);
                 consumer.setNode(getNode());
                 consumer.setDispatcher(dispatcher);
                 consumer.init();
-                consumer.start();
-                queueConsumerMap.put(name, consumer);
                 return consumer;
-            }
         }
     }
 

Modified: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
(original)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
Thu Sep  1 13:37:55 2011
@@ -59,6 +59,9 @@ public class HazelcastGroupManager imple
     private Map<String, ServiceRegistration> producerRegistrations = new HashMap<String,
ServiceRegistration>();
     private Map<String, ServiceRegistration> consumerRegistrations = new HashMap<String,
ServiceRegistration>();
 
+    private Map<String, EventProducer> groupProducers = new HashMap<String, EventProducer>();
+    private Map<String, EventConsumer> groupConsumer = new HashMap<String, EventConsumer>();
+
     private BundleContext bundleContext;
 
     private HazelcastInstance instance;
@@ -89,6 +92,15 @@ public class HazelcastGroupManager imple
         registerGroup(DEFAULT_GROUP);
     }
 
+    public void destroy() {
+        for(Map.Entry<String,EventConsumer> consumerEntry:groupConsumer.entrySet())
{
+            EventConsumer consumer = consumerEntry.getValue();
+            consumer.stop();
+        }
+        groupConsumer.clear();
+        groupProducers.clear();
+    }
+
     @Override
     public Node getNode() {
         Node node = null;
@@ -228,23 +240,30 @@ public class HazelcastGroupManager imple
     public void registerGroup(Group group) {
         String groupName = group.getName();
         createGroup(groupName);
-        ITopic topic = instance.getTopic(Constants.TOPIC + "." + groupName);
 
         Properties serviceProperties = new Properties();
         serviceProperties.put("type", "group");
         serviceProperties.put("name", groupName);
 
         if (!producerRegistrations.containsKey(groupName)) {
-            TopicProducer producer = new TopicProducer();
-            producer.setTopic(topic);
-            producer.setNode(getNode());
+            EventProducer producer = groupProducers.get(groupName);
+            if(producer == null) {
+                producer = eventTransportFactory.getEventProducer(groupName,Boolean.TRUE);
+                groupProducers.put(groupName,producer);
+            }
 
             ServiceRegistration producerRegistration = bundleContext.registerService(EventProducer.class.getCanonicalName(),
producer, serviceProperties);
             producerRegistrations.put(groupName, producerRegistration);
         }
 
         if (!consumerRegistrations.containsKey(groupName)) {
-            EventConsumer consumer = eventTransportFactory.getEventConsumer(groupName,true);
+            EventConsumer consumer = groupConsumer.get(groupName);
+            if(consumer == null) {
+                consumer = eventTransportFactory.getEventConsumer(groupName,true);
+                groupConsumer.put(groupName,consumer);
+            } else if(!consumer.isConsuming()) {
+                consumer.start();
+            }
             ServiceRegistration consumerRegistration = bundleContext.registerService(EventConsumer.class.getCanonicalName(),
consumer, serviceProperties);
             consumerRegistrations.put(groupName, consumerRegistration);
         }
@@ -336,6 +355,15 @@ public class HazelcastGroupManager imple
             }
         }
 
+        //4. Remove Consumers & Producers
+        groupProducers.remove(groupName);
+        EventConsumer consumer = groupConsumer.remove(groupName);
+        if(consumer != null) {
+            consumer.stop();
+        }
+
+
+
         //Remove group from configuration
         try {
             Configuration configuration = configurationAdmin.getConfiguration(Configurations.NODE);

Modified: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
(original)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
Thu Sep  1 13:37:55 2011
@@ -15,9 +15,7 @@ package org.apache.karaf.cellar.hazelcas
 
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
-import com.hazelcast.core.ITopic;
 import com.hazelcast.core.ItemListener;
-import com.hazelcast.core.MessageListener;
 import org.apache.karaf.cellar.core.Dispatcher;
 import org.apache.karaf.cellar.core.Node;
 import org.apache.karaf.cellar.core.control.BasicSwitch;
@@ -25,27 +23,35 @@ import org.apache.karaf.cellar.core.cont
 import org.apache.karaf.cellar.core.control.SwitchStatus;
 import org.apache.karaf.cellar.core.event.Event;
 import org.apache.karaf.cellar.core.event.EventConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Consumes messages from the distributed {@code ITopic} and calls the {@code EventDispatcher}.
  */
-public class QueueConsumer<E extends Event> implements EventConsumer<E>, ItemListener<E>
{
+public class QueueConsumer<E extends Event> implements EventConsumer<E>, ItemListener<E>,
Runnable {
+
+    private static final transient Logger LOGGER = LoggerFactory.getLogger(QueueConsumer.class);
 
     public static final String SWITCH_ID = "org.apache.karaf.cellar.queue.consumer";
 
     private final Switch eventSwitch = new BasicSwitch(SWITCH_ID);
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    private Boolean isConsuming = Boolean.TRUE;
 
     private HazelcastInstance instance;
     private IQueue queue;
     private Dispatcher dispatcher;
     private Node node;
 
-    private QueueConsumeTask queueConsumeTask = new QueueConsumeTask(this);
-    private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    public QueueConsumer() {
+    }
 
 
     /**
@@ -58,7 +64,7 @@ public class QueueConsumer<E extends Eve
             queue = instance.getQueue(Constants.QUEUE);
             queue.addItemListener(this,true);
         }
-
+        executorService.execute(this);
     }
 
     /**
@@ -68,6 +74,31 @@ public class QueueConsumer<E extends Eve
         if (queue != null) {
             queue.removeItemListener(this);
         }
+        executorService.shutdown();
+    }
+
+    @Override
+    public void run() {
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            while (isConsuming) {
+                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                E e = null;
+                try {
+                        e = getQueue().poll(10, TimeUnit.SECONDS);
+                } catch (InterruptedException e1) {
+                    LOGGER.warn("Consume task interrupted");
+                }
+                if (e != null) {
+                    consume(e);
+                }
+            }
+        } catch (Exception ex) {
+            LOGGER.error("Error while consuming from queue",ex);
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
     }
 
     /**
@@ -84,14 +115,18 @@ public class QueueConsumer<E extends Eve
     @Override
     public void start()
     {
-        queueConsumeTask.activate();
-        executorService.execute(queueConsumeTask);
+        isConsuming = true;
+        executorService.execute(this);
     }
 
 
     @Override
     public void stop() {
-      queueConsumeTask.deactivate();
+      isConsuming = false;
+    }
+
+    public Boolean isConsuming() {
+        return isConsuming;
     }
 
     @Override
@@ -104,14 +139,6 @@ public class QueueConsumer<E extends Eve
 
     }
 
-    public QueueConsumeTask getQueueConsumeTask() {
-        return queueConsumeTask;
-    }
-
-    public void setQueueConsumeTask(QueueConsumeTask queueConsumeTask) {
-        this.queueConsumeTask = queueConsumeTask;
-    }
-
     public Dispatcher getDispatcher() {
         return dispatcher;
     }

Modified: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/TopicConsumer.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/TopicConsumer.java?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/TopicConsumer.java
(original)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/TopicConsumer.java
Thu Sep  1 13:37:55 2011
@@ -38,6 +38,8 @@ public class TopicConsumer<E extends Eve
     private Dispatcher dispatcher;
     private Node node;
 
+    private boolean isConsuming;
+
     /**
      * Initialization method.
      */
@@ -66,6 +68,7 @@ public class TopicConsumer<E extends Eve
 
     @Override
     public void start() {
+        isConsuming = true;
         if (topic != null) {
             topic.addMessageListener(this);
         } else {
@@ -77,11 +80,17 @@ public class TopicConsumer<E extends Eve
 
     @Override
     public void stop() {
+        isConsuming = false;
         if (topic != null) {
             topic.removeMessageListener(this);
         }
     }
 
+    @Override
+    public Boolean isConsuming() {
+        return isConsuming;
+    }
+
     public void onMessage(E message) {
         consume(message);
     }

Modified: karaf/cellar/trunk/hazelcast/src/main/resources/META-INF/spring/beans.xml
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/resources/META-INF/spring/beans.xml?rev=1164077&r1=1164076&r2=1164077&view=diff
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/resources/META-INF/spring/beans.xml (original)
+++ karaf/cellar/trunk/hazelcast/src/main/resources/META-INF/spring/beans.xml Thu Sep  1 13:37:55
2011
@@ -54,7 +54,7 @@
     </bean>
 
     <!-- Group Manager -->
-    <bean id="groupManager" class="org.apache.karaf.cellar.hazelcast.HazelcastGroupManager"
init-method="init">
+    <bean id="groupManager" class="org.apache.karaf.cellar.hazelcast.HazelcastGroupManager"
init-method="init" destroy-method="destroy">
         <property name="instance" ref="hazelcast"/>
         <property name="configurationAdmin" ref="configurationAdmin"/>
         <property name="eventTransportFactory" ref="eventTransportFactory"/>



Mime
View raw message