cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject svn commit: r1222713 - in /cxf/trunk/rt/core/src/main/java/org/apache/cxf: bus/osgi/ManagedWorkQueueList.java bus/osgi/OSGiAutomaticWorkQueue.java bus/osgi/OSGiExtensionLocator.java workqueue/AutomaticWorkQueueImpl.java
Date Fri, 23 Dec 2011 16:07:23 GMT
Author: cschneider
Date: Fri Dec 23 16:07:23 2011
New Revision: 1222713

URL: http://svn.apache.org/viewvc?rev=1222713&view=rev
Log:
CXF-3983 Use ManagedServiceFactory instead of ManagedService

Added:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/ManagedWorkQueueList.java
Removed:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiAutomaticWorkQueue.java
Modified:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java

Added: cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/ManagedWorkQueueList.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/ManagedWorkQueueList.java?rev=1222713&view=auto
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/ManagedWorkQueueList.java (added)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/ManagedWorkQueueList.java Fri
Dec 23 16:07:23 2011
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.bus.osgi;
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.service.factory.AbstractServiceFactoryBean;
+import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * List of work queues that cane be managed using the OSGi config admin service
+ */
+public class ManagedWorkQueueList implements ManagedServiceFactory, PropertyChangeListener
{
+    public static final String FACTORY_PID = "org.apache.cxf.workqueues";    
+    private static final Logger LOG = LogUtils.getL7dLogger(AbstractServiceFactoryBean.class);
+    
+    Map<String, AutomaticWorkQueueImpl> queues = new ConcurrentHashMap<String, AutomaticWorkQueueImpl>();
+    private ServiceTracker configAdminTracker;
+    
+    public String getName() {
+        return FACTORY_PID;
+    }
+
+    public void updated(String pid, Dictionary properties) throws ConfigurationException
{
+        if (pid == null) {
+            return;
+        }
+        String queueName = (String)properties.get(AutomaticWorkQueueImpl.PROPERTY_NAME);
+        if (queues.containsKey(queueName)) {
+            queues.get(queueName).update(properties);
+        } else {
+            AutomaticWorkQueueImpl wq = new AutomaticWorkQueueImpl(queueName);
+            wq.setShared(true);
+            wq.update(properties);
+            wq.addChangeListener(this);
+            queues.put(pid, wq);
+        }
+    }
+
+    public void deleted(String pid) {
+        queues.remove(pid);
+    }
+
+    public void propertyChange(PropertyChangeEvent evt) {
+        try {
+            AutomaticWorkQueueImpl queue = (AutomaticWorkQueueImpl)evt.getSource();
+            ConfigurationAdmin configurationAdmin = (ConfigurationAdmin)configAdminTracker.getService();
+            if (configurationAdmin != null) {
+                Configuration selectedConfig = findConfigForQueueName(queue, configurationAdmin);
+                if (selectedConfig != null) {
+                    Dictionary properties = queue.getProperties();
+                    selectedConfig.update(properties);
+                }
+            }
+        } catch (Exception e) {
+            LOG.log(Level.WARNING, e.getMessage(), e);
+        }
+    }
+
+    private Configuration findConfigForQueueName(AutomaticWorkQueueImpl queue,
+                                                 ConfigurationAdmin configurationAdmin) throws
Exception {
+        Configuration selectedConfig = null;
+        String filter = "(service.factoryPid=" + ManagedWorkQueueList.FACTORY_PID + ")";
+        Configuration[] configs = configurationAdmin.listConfigurations(filter);
+        for (Configuration configuration : configs) {
+            Dictionary props = configuration.getProperties();
+            String name = (String)props.get(AutomaticWorkQueueImpl.PROPERTY_NAME);
+            if (queue.getName().equals(name)) {
+                selectedConfig = configuration;
+            }
+        }
+        return selectedConfig;
+    }
+
+    public void setConfigAdminTracker(ServiceTracker configAdminTracker) {
+        this.configAdminTracker = configAdminTracker;
+    }
+}
\ No newline at end of file

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java?rev=1222713&r1=1222712&r2=1222713&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/bus/osgi/OSGiExtensionLocator.java Fri
Dec 23 16:07:23 2011
@@ -39,13 +39,11 @@ import org.apache.cxf.bus.extension.Exte
 import org.apache.cxf.bus.extension.ExtensionFragmentParser;
 import org.apache.cxf.bus.extension.ExtensionManagerImpl;
 import org.apache.cxf.bus.extension.ExtensionRegistry;
-import org.apache.cxf.bus.osgi.OSGiAutomaticWorkQueue.WorkQueueList;
 import org.apache.cxf.buslifecycle.BusCreationListener;
 import org.apache.cxf.buslifecycle.BusLifeCycleListener;
 import org.apache.cxf.buslifecycle.BusLifeCycleManager;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.ConfiguredBeanLocator;
-import org.apache.cxf.configuration.ConfiguredBeanLocator.BeanLoaderListener;
 import org.apache.cxf.endpoint.ClientLifeCycleListener;
 import org.apache.cxf.endpoint.ClientLifeCycleManager;
 import org.apache.cxf.endpoint.ServerLifeCycleListener;
@@ -62,8 +60,9 @@ import org.osgi.framework.ServiceReferen
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.framework.SynchronousBundleListener;
 import org.osgi.framework.Version;
-import org.osgi.service.cm.Configuration;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.util.tracker.ServiceTracker;
 
 /**
  * 
@@ -75,7 +74,8 @@ public class OSGiExtensionLocator implem
     private long id;
     private Extension listener;
 
-    private WorkQueueList workQueues = new WorkQueueList();
+    private ManagedWorkQueueList workQueues = new ManagedWorkQueueList();
+    private ServiceTracker configAdminTracker;
     
     /** {@inheritDoc}*/
     public void bundleChanged(BundleEvent event) {
@@ -104,32 +104,26 @@ public class OSGiExtensionLocator implem
                 register(bundle);
             }
         }
-        ServiceReference configAdminServiceRef =  
-            context.getServiceReference(ConfigurationAdmin.class.getName());
         
-              
-        if (configAdminServiceRef != null) {  
-            ConfigurationAdmin configAdmin = (ConfigurationAdmin)  
-                    context.getService(configAdminServiceRef);  
-            Configuration config = configAdmin.getConfiguration("org.apache.cxf.workqueues");
-            
-            workQueues.register(context, config);
-            
-            Dictionary d = config.getProperties();
-                        
-            if (d != null) {
-                workQueues.updated(d);
-            }
-            Extension ext = new Extension(WorkQueueList.class) {
-                public Object getLoadedObject() {
-                    return workQueues;
-                }
-                public Extension cloneNoObject() {
-                    return this;
-                }
-            };
-            ExtensionRegistry.addExtensions(Collections.singletonList(ext));
-        }
+        configAdminTracker = new ServiceTracker(context, ConfigurationAdmin.class.getName(),
null);
+        configAdminTracker.open();
+        workQueues.setConfigAdminTracker(configAdminTracker);
+
+        Properties props = new Properties();
+        props.put(Constants.SERVICE_PID, workQueues.getName());  
+        context.registerService(ManagedServiceFactory.class.getName(), workQueues, props);
+
+        Extension ext = new Extension(ManagedWorkQueueList.class) {
+            public Object getLoadedObject() {
+                return workQueues;
+            }
+
+            public Extension cloneNoObject() {
+                return this;
+            }
+        };
+        ExtensionRegistry.addExtensions(Collections.singletonList(ext));
+
     }
 
     /** {@inheritDoc}*/
@@ -144,6 +138,7 @@ public class OSGiExtensionLocator implem
             wq.shutdown(true);
         }
         workQueues.queues.clear();
+        configAdminTracker.close();
     }
     private void registerBusListener(final BundleContext context) {
         listener = new Extension(OSGIBusListener.class);
@@ -285,7 +280,7 @@ public class OSGiExtensionLocator implem
  
         public void initComplete() {
             WorkQueueManager m = bus.getExtension(WorkQueueManager.class);
-            WorkQueueList l = bus.getExtension(WorkQueueList.class);
+            ManagedWorkQueueList l = bus.getExtension(ManagedWorkQueueList.class);
             if (l != null && m != null) {
                 for (AutomaticWorkQueueImpl wq : l.queues.values()) {
                     if (m.getNamedWorkQueue(wq.getName()) == null) {

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1222713&r1=1222712&r2=1222713&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Fri
Dec 23 16:07:23 2011
@@ -19,10 +19,17 @@
 
 package org.apache.cxf.workqueue;
 
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -44,7 +51,7 @@ import org.apache.cxf.common.util.Reflec
 
 @NoJSR250Annotations
 public class AutomaticWorkQueueImpl implements AutomaticWorkQueue {
-
+    public static final String PROPERTY_NAME = "name";
     static final int DEFAULT_MAX_QUEUE_SIZE = 256;
     private static final Logger LOG =
         LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class);
@@ -68,6 +75,8 @@ public class AutomaticWorkQueueImpl impl
     boolean shared;
     int sharedCount;
     
+    private List<PropertyChangeListener> changeListenerList;
+    
     public AutomaticWorkQueueImpl() {
         this(DEFAULT_MAX_QUEUE_SIZE);
     }    
@@ -104,10 +113,25 @@ public class AutomaticWorkQueueImpl impl
         this.lowWaterMark = -1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark;
         this.dequeueTimeout = dequeueTimeout;
         this.name = name;
+        this.changeListenerList = new ArrayList<PropertyChangeListener>();
+    }
+    
+    public void addChangeListener(PropertyChangeListener listener) {
+        this.changeListenerList.add(listener);
+    }
+    
+    public void removeChangeListener(PropertyChangeListener listener) {
+        this.changeListenerList.remove(listener);
     }
     
-    public void setShared(boolean b) {
-        shared = b;
+    public void notifyChangeListeners(PropertyChangeEvent event) {
+        for (PropertyChangeListener listener : changeListenerList) {
+            listener.propertyChange(event);
+        }
+    }
+    
+    public void setShared(boolean shared) {
+        this.shared = shared;
     }
     public boolean isShared() {
         return shared;
@@ -478,6 +502,8 @@ public class AutomaticWorkQueueImpl impl
     public void setHighWaterMark(int hwm) {
         highWaterMark = hwm < 0 ? Integer.MAX_VALUE : hwm;
         if (executor != null) {
+            notifyChangeListeners(new PropertyChangeEvent(this, "highWaterMark", 
+                                                          this.executor.getMaximumPoolSize(),
hwm));
             executor.setMaximumPoolSize(highWaterMark);
         }
     }
@@ -485,19 +511,24 @@ public class AutomaticWorkQueueImpl impl
     public void setLowWaterMark(int lwm) {
         lowWaterMark = lwm < 0 ? 0 : lwm;
         if (executor != null) {
+            notifyChangeListeners(new PropertyChangeEvent(this, "lowWaterMark",
+                                                          this.executor.getCorePoolSize(),
lwm)); 
             executor.setCorePoolSize(lowWaterMark);
         }
     }
 
     public void setInitialSize(int initialSize) {
+        notifyChangeListeners(new PropertyChangeEvent(this, "initialSize", this.initialThreads,
initialSize));
         this.initialThreads = initialSize;
     }
     
     public void setQueueSize(int size) {
+        notifyChangeListeners(new PropertyChangeEvent(this, "queueSize", this.maxQueueSize,
size));
         this.maxQueueSize = size;
     }
     
     public void setDequeueTimeout(long l) {
+        notifyChangeListeners(new PropertyChangeEvent(this, "dequeueTimeout", this.dequeueTimeout,
l));
         this.dequeueTimeout = l;
     }
     
@@ -525,4 +556,37 @@ public class AutomaticWorkQueueImpl impl
         }
         return executor.getActiveCount();
     }
+    public void update(Dictionary config) {
+        String s = (String)config.get("highWaterMark");
+        if (s != null) {
+            this.highWaterMark = Integer.parseInt(s);
+        }
+        s = (String)config.get("lowWaterMark");
+        if (s != null) {
+            this.lowWaterMark = Integer.parseInt(s);
+        }
+        s = (String)config.get("initialSize");
+        if (s != null) {
+            this.initialThreads = Integer.parseInt(s);
+        }
+        s = (String)config.get("dequeueTimeout");
+        if (s != null) {
+            this.dequeueTimeout = Long.parseLong(s);
+        }
+        s = (String)config.get("queueSize");
+        if (s != null) {
+            this.maxQueueSize = Integer.parseInt(s);
+        } 
+    }
+    public Dictionary getProperties() {
+        Dictionary<String, String> properties = new Hashtable<String, String>();
+        NumberFormat nf = NumberFormat.getIntegerInstance();
+        properties.put("name", nf.format(getName()));
+        properties.put("highWaterMark", nf.format(getHighWaterMark()));
+        properties.put("lowWaterMark", nf.format(getLowWaterMark()));
+        properties.put("initialSize", nf.format(getLowWaterMark()));
+        properties.put("dequeueTimeout", nf.format(getLowWaterMark()));
+        properties.put("queueSize", nf.format(getLowWaterMark()));
+        return properties;
+    }
 }



Mime
View raw message