cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1183043 - in /cxf/trunk/rt/core/src/main/java/org/apache/cxf: bus/osgi/ workqueue/
Date Thu, 13 Oct 2011 19:20:36 GMT
Author: dkulp
Date: Thu Oct 13 19:20:36 2011
New Revision: 1183043

URL: http://svn.apache.org/viewvc?rev=1183043&view=rev
Log:
More updates to the OSGi shared workqueues
Don't register the MBean on every bus, use a "Shared" key for those
Wire the JMX property changes into the Config so they are persisted

Modified:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiAutomaticWorkQueue.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueImplMBeanWrapper.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiAutomaticWorkQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiAutomaticWorkQueue.java?rev=1183043&r1=1183042&r2=1183043&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiAutomaticWorkQueue.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiAutomaticWorkQueue.java Thu
Oct 13 19:20:36 2011
@@ -19,18 +19,120 @@
 
 package org.apache.cxf.bus.osgi;
 
+import java.io.IOException;
 import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
 
 import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
 
 /**
  * 
  */
 public class OSGiAutomaticWorkQueue extends AutomaticWorkQueueImpl {
+    static class WorkQueueList implements ManagedService {
+        Map<String, OSGiAutomaticWorkQueue> queues 
+            = new ConcurrentHashMap<String, OSGiAutomaticWorkQueue>();
+        ServiceRegistration registration;
+        Configuration config;
+        Properties current = new Properties();
+
+        
+        public void register(BundleContext ctx, Configuration c) {
+            Properties props = new Properties();
+            props.put(Constants.SERVICE_PID, "org.apache.cxf.workqueues");  
+
+            registration = ctx.registerService(ManagedService.class.getName(),  
+                                               this, props);
+            
+            this.config = c;
+        }
+        
+        public void updateProperty(String key, String val) {
+            if (val != null) {
+                current.put(key, val);
+            } else {
+                current.remove(key);
+            }
+            try {
+                config.update(current);
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+        public void updated(Dictionary d) throws ConfigurationException {
+            current.clear();
+            if (d == null) {
+                return;
+            }
+            Enumeration e = d.keys();
+            while (e.hasMoreElements()) {
+                String k = (String)e.nextElement();
+                current.put(k, d.get(k));
+            }
+            String s = (String)d.get("org.apache.cxf.workqueue.names");
+            if (s != null) {
+                String s2[] = s.split(",");
+                for (String name : s2) {
+                    name = name.trim();
+                    if (queues.containsKey(name)) {
+                        queues.get(name).updated(d);
+                    } else {
+                        OSGiAutomaticWorkQueue wq = new OSGiAutomaticWorkQueue(name, this);
+                        wq.updated(d);
+                        wq.setShared(true);
+                        queues.put(name, wq);
+                    }
+                }
+            }
+            registration.setProperties(d);
+        }
+    };
+    final WorkQueueList qlist;
     
-    public OSGiAutomaticWorkQueue(String name) {
+    public OSGiAutomaticWorkQueue(String name, WorkQueueList ql) {
         super(name);
+        qlist = ql;
+    }
+
+    
+    public void setHighWaterMark(int hwm) {
+        super.setHighWaterMark(hwm);
+        qlist.updateProperty("org.apache.cxf.workqueue." + getName() + ".highWaterMark",
+                             Integer.toString(getHighWaterMark()));
+    }
+
+    public void setLowWaterMark(int lwm) {
+        super.setLowWaterMark(lwm);
+        qlist.updateProperty("org.apache.cxf.workqueue." + getName() + ".lowWaterMark",
+                             Integer.toString(getLowWaterMark()));
+    }
+
+    public void setInitialSize(int initialSize) {
+        super.setInitialSize(initialSize);
+        qlist.updateProperty("org.apache.cxf.workqueue." + getName() + ".initialSize",
+                             Integer.toString(initialSize));
+    }
+
+    public void setQueueSize(int size) {
+        super.setQueueSize(size);
+        qlist.updateProperty("org.apache.cxf.workqueue." + getName() + ".queueSize",
+                             Integer.toString(size));
+    }
+
+    public void setDequeueTimeout(long l) {
+        super.setDequeueTimeout(l);
+        qlist.updateProperty("org.apache.cxf.workqueue." + getName() + ".dequeueTimeout",
+                             Long.toString(l));
     }
 
     /** {@inheritDoc}*/
@@ -38,23 +140,23 @@ public class OSGiAutomaticWorkQueue exte
         String name = getName();
         String s = (String)d.get("org.apache.cxf.workqueue." + name + ".highWaterMark");
         if (s != null) {
-            setHighWaterMark(Integer.parseInt(s));
+            super.setHighWaterMark(Integer.parseInt(s));
         }
         s = (String)d.get("org.apache.cxf.workqueue." + name + ".lowWaterMark");
         if (s != null) {
-            setLowWaterMark(Integer.parseInt(s));
+            super.setLowWaterMark(Integer.parseInt(s));
         }
         s = (String)d.get("org.apache.cxf.workqueue." + name + ".initialSize");
         if (s != null) {
-            setInitialSize(Integer.parseInt(s));
+            super.setInitialSize(Integer.parseInt(s));
         }
         s = (String)d.get("org.apache.cxf.workqueue." + name + ".dequeueTimeout");
         if (s != null) {
-            setDequeueTimeout(Long.parseLong(s));
+            super.setDequeueTimeout(Long.parseLong(s));
         }
         s = (String)d.get("org.apache.cxf.workqueue." + name + ".queueSize");
         if (s != null) {
-            setQueueSize(Integer.parseInt(s));
+            super.setQueueSize(Integer.parseInt(s));
         } 
 
     }

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java?rev=1183043&r1=1183042&r2=1183043&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java Thu
Oct 13 19:20:36 2011
@@ -26,7 +26,6 @@ import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Enumeration;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -37,6 +36,7 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.bus.extension.Extension;
 import org.apache.cxf.bus.extension.ExtensionFragmentParser;
 import org.apache.cxf.bus.extension.ExtensionRegistry;
+import org.apache.cxf.bus.osgi.OSGiAutomaticWorkQueue.WorkQueueList;
 import org.apache.cxf.buslifecycle.BusLifeCycleListener;
 import org.apache.cxf.buslifecycle.BusLifeCycleManager;
 import org.apache.cxf.common.logging.LogUtils;
@@ -53,8 +53,6 @@ import org.osgi.framework.SynchronousBun
 import org.osgi.framework.Version;
 import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
-import org.osgi.service.cm.ConfigurationException;
-import org.osgi.service.cm.ManagedService;
 
 /**
  * 
@@ -66,31 +64,6 @@ public class OSGiExtensionLocator implem
     private long id;
     private Extension listener;
 
-    static class WorkQueueList implements ManagedService {
-        Map<String, OSGiAutomaticWorkQueue> queues 
-            = new ConcurrentHashMap<String, OSGiAutomaticWorkQueue>();
-
-        public void updated(Dictionary d) throws ConfigurationException {
-            if (d == null) {
-                return;
-            }
-            String s = (String)d.get("org.apache.cxf.workqueue.names");
-            if (s != null) {
-                String s2[] = s.split(",");
-                for (String name : s2) {
-                    name = name.trim();
-                    if (queues.containsKey(name)) {
-                        queues.get(name).updated(d);
-                    } else {
-                        OSGiAutomaticWorkQueue wq = new OSGiAutomaticWorkQueue(name);
-                        wq.updated(d);
-                        wq.setShared(true);
-                        queues.put(name, wq);
-                    }
-                }
-            }
-        }
-    };
     private WorkQueueList workQueues = new WorkQueueList();
     
     /** {@inheritDoc}*/
@@ -127,16 +100,12 @@ public class OSGiExtensionLocator implem
         if (configAdminServiceRef != null) {  
             ConfigurationAdmin configAdmin = (ConfigurationAdmin)  
                     context.getService(configAdminServiceRef);  
-              
             Configuration config = configAdmin.getConfiguration("org.apache.cxf.workqueues");
-            Dictionary d = config.getProperties();
             
-            Properties props = new Properties();
-            props.put(Constants.SERVICE_PID, "org.apache.cxf.workqueues");  
-            context.registerService(ManagedService.class.getName(),  
-                                    workQueues, props); 
-
+            workQueues.register(context, config);
             
+            Dictionary d = config.getProperties();
+                        
             if (d != null) {
                 workQueues.updated(d);
             }

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1183043&r1=1183042&r2=1183043&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Thu
Oct 13 19:20:36 2011
@@ -66,6 +66,7 @@ public class AutomaticWorkQueueImpl impl
     WatchDog watchDog;
     
     boolean shared;
+    int sharedCount;
     
     public AutomaticWorkQueueImpl() {
         this(DEFAULT_MAX_QUEUE_SIZE);
@@ -108,6 +109,18 @@ public class AutomaticWorkQueueImpl impl
     public void setShared(boolean b) {
         shared = b;
     }
+    public boolean isShared() {
+        return shared;
+    }
+    public void addSharedUser() {
+        sharedCount++;
+    }
+    public void removeSharedUser() {
+        sharedCount--;
+    }
+    public int getShareCount() {
+        return sharedCount;
+    }
     
     protected synchronized ThreadPoolExecutor getExecutor() {
         if (executor == null) {
@@ -418,7 +431,7 @@ public class AutomaticWorkQueueImpl impl
     // AutomaticWorkQueue interface
     
     public void shutdown(boolean processRemainingWorkItems) {
-        if (executor != null && !shared) {
+        if (executor != null) {
             if (!processRemainingWorkItems) {
                 executor.getQueue().clear();
             }

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueImplMBeanWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueImplMBeanWrapper.java?rev=1183043&r1=1183042&r2=1183043&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueImplMBeanWrapper.java
(original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueImplMBeanWrapper.java
Thu Oct 13 19:20:36 2011
@@ -101,15 +101,20 @@ public class WorkQueueImplMBeanWrapper i
     }
 
     public ObjectName getObjectName() throws JMException {
-        String busId = "cxf";
-        if (manager instanceof WorkQueueManagerImpl) {
-            busId = ((WorkQueueManagerImpl)manager).getBus().getId();
-        }
         StringBuilder buffer = new StringBuilder();
         buffer.append(ManagementConstants.DEFAULT_DOMAIN_NAME + ":");
-        buffer.append(ManagementConstants.BUS_ID_PROP + "=" + busId + ",");
-        buffer.append(WorkQueueManagerImplMBeanWrapper.TYPE_VALUE + "=");
-        buffer.append(WorkQueueManagerImplMBeanWrapper.NAME_VALUE + ",");
+        if (!aWorkQueue.isShared()) {
+            String busId = "cxf";
+            if (manager instanceof WorkQueueManagerImpl) {
+                busId = ((WorkQueueManagerImpl)manager).getBus().getId();
+            }
+            buffer.append(ManagementConstants.BUS_ID_PROP + "=" + busId + ",");
+            buffer.append(WorkQueueManagerImplMBeanWrapper.TYPE_VALUE + "=");
+            buffer.append(WorkQueueManagerImplMBeanWrapper.NAME_VALUE + ",");
+        } else {
+            buffer.append(ManagementConstants.BUS_ID_PROP + "=Shared,");
+            //buffer.append(WorkQueueManagerImplMBeanWrapper.TYPE_VALUE + "=Shared,");
+        }
         buffer.append(ManagementConstants.TYPE_PROP + "=" + TYPE_VALUE + ",");
         buffer.append(ManagementConstants.NAME_PROP + "=" + aWorkQueue.getName());
        

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java?rev=1183043&r1=1183042&r2=1183043&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java Thu
Oct 13 19:20:36 2011
@@ -29,6 +29,8 @@ import javax.annotation.Resource;
 import javax.management.JMException;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
 import org.apache.cxf.common.injection.NoJSR250Annotations;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.ConfiguredBeanLocator;
@@ -87,6 +89,9 @@ public class WorkQueueManagerImpl implem
                     addNamedWorkQueue("default", defaultQueue);
                 }
             }
+            
+            bus.getExtension(BusLifeCycleManager.class)
+                .registerLifeCycleListener(new WQLifecycleListener());
         }
     }
 
@@ -101,7 +106,20 @@ public class WorkQueueManagerImpl implem
     public synchronized void shutdown(boolean processRemainingTasks) {
         inShutdown = true;
         for (AutomaticWorkQueue q : namedQueues.values()) {
-            q.shutdown(processRemainingTasks);
+            if (q instanceof AutomaticWorkQueueImpl) {
+                AutomaticWorkQueueImpl impl = (AutomaticWorkQueueImpl)q;
+                if (impl.isShared() && imanager != null 
+                    && imanager.getMBeanServer() != null) {
+                    synchronized (impl) {
+                        impl.removeSharedUser();
+                    }
+                }
+                if (!impl.isShared()) {
+                    q.shutdown(processRemainingTasks);
+                }
+            } else {
+                q.shutdown(processRemainingTasks);
+            }
         }
 
         synchronized (this) {
@@ -139,11 +157,26 @@ public class WorkQueueManagerImpl implem
     }
     public final void addNamedWorkQueue(String name, AutomaticWorkQueue q) {
         namedQueues.put(name, q);
-        if (imanager != null && q instanceof AutomaticWorkQueueImpl) {
-            try {
-                imanager.register(new WorkQueueImplMBeanWrapper((AutomaticWorkQueueImpl)q,
this));
-            } catch (JMException jmex) {
-                LOG.log(Level.WARNING , jmex.getMessage(), jmex);
+        if (q instanceof AutomaticWorkQueueImpl) {
+            AutomaticWorkQueueImpl impl = (AutomaticWorkQueueImpl)q;
+            if (impl.isShared()) {
+                synchronized (impl) {
+                    if (impl.getShareCount() == 0 && imanager != null 
+                        && imanager.getMBeanServer() != null) {
+                        try {
+                            imanager.register(new WorkQueueImplMBeanWrapper((AutomaticWorkQueueImpl)q,
this));
+                        } catch (JMException jmex) {
+                            LOG.log(Level.WARNING , jmex.getMessage(), jmex);
+                        }
+                        impl.addSharedUser();
+                    }
+                }
+            } else if (imanager != null) {
+                try {
+                    imanager.register(new WorkQueueImplMBeanWrapper((AutomaticWorkQueueImpl)q,
this));
+                } catch (JMException jmex) {
+                    LOG.log(Level.WARNING , jmex.getMessage(), jmex);
+                }
             }
         }
     }
@@ -153,5 +186,16 @@ public class WorkQueueManagerImpl implem
         addNamedWorkQueue("default", q);
         return q;
     }
-
+    
+    
+    class WQLifecycleListener implements BusLifeCycleListener {
+        public void initComplete() {
+            
+        }
+        public void preShutdown() {
+            shutdown(true);
+        }
+        public void postShutdown() {
+        }
+    }
 }



Mime
View raw message