cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r442516 - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/workqueue/ common/src/main/java/org/apache/cxf/event/ rt/core/src/main/java/org/apache/cxf/workqueue/ rt/core/src/main/resources/META-INF/ rt/core/src/test/java/org/apache/...
Date Tue, 12 Sep 2006 08:17:12 GMT
Author: ningjiang
Date: Tue Sep 12 01:17:11 2006
New Revision: 442516

URL: http://svn.apache.org/viewvc?view=rev&rev=442516
Log:
CXF-70 Made the management can work with the common event framework \n Added WorkQueue to rt-core \n Added WorkQueueInstrumentation for WorkQueue management

Added:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueue.java   (with props)
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueue.java   (with props)
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java   (with props)
    incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/ComponentEventFilter.java   (with props)
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java   (with props)
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/Messages.properties   (with props)
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueInstrumentation.java   (with props)
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java   (with props)
    incubator/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/
    incubator/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java   (with props)
    incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java   (with props)
Modified:
    incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/Event.java
    incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/EventListener.java
    incubator/cxf/trunk/rt/core/src/main/resources/META-INF/bus-extensions.xml
    incubator/cxf/trunk/rt/management/pom.xml
    incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/InstrumentationManagerImpl.java
    incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXManagedComponentManager.java
    incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXUtils.java
    incubator/cxf/trunk/rt/management/src/main/resources/META-INF/bus-extensions.xml
    incubator/cxf/trunk/rt/management/src/main/resources/config-metadata/instrumentation-config.xml
    incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/jmx/JMXManagedComponentManagerTest.java
    incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java

Added: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueue.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueue.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueue.java (added)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueue.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.workqueue;
+
+public interface AutomaticWorkQueue extends WorkQueue {
+    /**
+     * Initiates an orderly shutdown. 
+     * If <code>processRemainingWorkItems</code>
+     * is true, waits for all active items to finish execution before returning, otherwise returns 
+     * immediately after removing all non active items from the queue.
+     * 
+     * @param processRemainingWorkItems
+     */
+    void shutdown(boolean processRemainingWorkItems);
+    
+    /**
+     * Returns true if this object has been shut down.
+     * @return true if this object has been shut down.
+     */
+    boolean isShutdown();
+}

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueue.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueue.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueue.java (added)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueue.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.workqueue;
+
+import java.util.concurrent.Executor;
+
+public interface WorkQueue extends Executor {
+    /**
+     * Submits a work item for execution at some time in the future, waiting for up to a 
+     * specified amount of time for the item to be accepted.
+     * 
+     * @param work the workitem to submit for execution.
+     * @param timeout the maximum amount of time (in milliseconds) to wait for it to be accepted.
+     *
+     * @throws <code>RejectedExecutionException</code> if this work item cannot be accepted for execution.
+     * @throws <code>NullPointerException</code> if work item is null.
+     */
+    void execute(Runnable work, long timeout);
+    
+    /**
+     * Schedules a work item for execution at some time in the future.
+     * 
+     * @param work the task to submit for execution.
+     * @param delay the delay before the task is executed
+     *
+     * @throws <code>RejectedExecutionException</code> if this task cannot be accepted for execution.
+     * @throws <code>NullPointerException</code> if task is null.
+     */
+    void schedule(Runnable work, long delay);
+}

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java (added)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.workqueue;
+
+public interface WorkQueueManager {
+
+    enum ThreadingModel {
+        SINGLE_THREADED, MULTI_THREADED
+    };
+
+    /**
+     * Get the manager's work queue.
+     * @return AutomaticWorkQueue
+     */
+    AutomaticWorkQueue getAutomaticWorkQueue();
+
+    /**
+     * Get the threading model.
+     * @return ThreadingModel - either <code>SINGLE_THREADED</code>
+     * or <code>MULTI_THREADED</code>.
+     */
+    ThreadingModel getThreadingModel();
+
+    /**
+     * Set the threading model.
+     * @param model either <code>SINGLE_THREADED</code>
+     * or <code>MULTI_THREADED</code>.
+     */
+    void setThreadingModel(ThreadingModel model);
+    
+    /**
+     * Shuts down the manager's work queue. If
+     * <code>processRemainingTasks</code> is true, waits for the work queue to
+     * shutdown before returning.
+     * @param processRemainingTasks - whether or not to wait for completion
+     */
+    void shutdown(boolean processRemainingTasks);
+    
+    /**
+     * Only returns after workqueue has been shutdown.
+     *
+     */
+    void run();
+}

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/workqueue/WorkQueueManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/ComponentEventFilter.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/ComponentEventFilter.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/ComponentEventFilter.java (added)
+++ incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/ComponentEventFilter.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.event;
+
+public class ComponentEventFilter implements EventFilter {
+    public static final String COMPONENT_CREATED_EVENT = "cxf.component.created.event";
+    public static final String COMPONENT_REMOVED_EVENT = "cxf.component.removed.event";
+
+    public boolean isEventEnabled(Event e) {        
+        if (e.getID().getLocalPart().compareTo(COMPONENT_CREATED_EVENT) == 0) {
+            return true;
+        } else if (e.getID().getLocalPart().compareTo(COMPONENT_REMOVED_EVENT) == 0) {
+            return true;
+        }
+        return false;
+    }
+
+}

Propchange: incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/ComponentEventFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/Event.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/Event.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/Event.java (original)
+++ incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/Event.java Tue Sep 12 01:17:11 2006
@@ -29,11 +29,11 @@
  */
 public class Event extends EventObject {
 
-    /*
-    public static final String BUS_EVENT = "org.apache.cxf.bus.event";
+    
+    /*public static final String BUS_EVENT = "org.apache.cxf.bus.event";
     public static final String COMPONENT_CREATED_EVENT = "COMPONENT_CREATED_EVENT";
-    public static final String COMPONENT_REMOVED_EVENT = "COMPONENT_REMOVED_EVENT";
-    */
+    public static final String COMPONENT_REMOVED_EVENT = "COMPONENT_REMOVED_EVENT";*/
+    
     
     private QName eventId;
 

Modified: incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/EventListener.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/EventListener.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/EventListener.java (original)
+++ incubator/cxf/trunk/common/src/main/java/org/apache/cxf/event/EventListener.java Tue Sep 12 01:17:11 2006
@@ -24,7 +24,7 @@
 /**
  * Should be implemented by an object that wants to receive events.
  */
-interface EventListener extends java.util.EventListener {
+public interface EventListener extends java.util.EventListener {
     /**
      * Invoked when an event occurs.
      * The implementation of this method should return as soon as possible,

Added: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (added)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.workqueue;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class AutomaticWorkQueueImpl extends ThreadPoolExecutor implements AutomaticWorkQueue {
+
+    static final int DEFAULT_MAX_QUEUE_SIZE = 128;
+    private static final Logger LOG =
+        LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class);
+    
+    int maxQueueSize;
+
+    AutomaticWorkQueueImpl(int mqs, int initialThreads, int highWaterMark, int lowWaterMark,
+                           long dequeueTimeout) {
+        
+        super(-1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark, 
+            -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark,
+                TimeUnit.MILLISECONDS.toMillis(dequeueTimeout), TimeUnit.MILLISECONDS, 
+                mqs == -1 ? new ArrayBlockingQueue<Runnable>(DEFAULT_MAX_QUEUE_SIZE)
+                    : new ArrayBlockingQueue<Runnable>(mqs));
+        
+        maxQueueSize = mqs == -1 ? DEFAULT_MAX_QUEUE_SIZE : mqs;
+        lowWaterMark = -1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark;
+        highWaterMark = -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark;
+                
+        StringBuffer buf = new StringBuffer();
+        buf.append("Constructing automatic work queue with:\n");
+        buf.append("max queue size: " + maxQueueSize + "\n");
+        buf.append("initialThreads: " + initialThreads + "\n");
+        buf.append("lowWaterMark: " + lowWaterMark + "\n");
+        buf.append("highWaterMark: " + highWaterMark + "\n");
+        LOG.fine(buf.toString());
+
+        if (initialThreads > highWaterMark) {
+            initialThreads = highWaterMark;
+        }
+
+        // as we cannot prestart more core than corePoolSize initial threads, we temporarily
+        // change the corePoolSize to the number of initial threads
+        // this is important as otherwise these threads will be created only when the queue has filled up, 
+        // potentially causing problems with starting up under heavy load
+        if (initialThreads < Integer.MAX_VALUE && initialThreads > 0) {
+            setCorePoolSize(initialThreads);
+            int started = prestartAllCoreThreads();
+            if (started < initialThreads) {
+                LOG.log(Level.WARNING, "THREAD_START_FAILURE_MSG", new Object[] {started, initialThreads});
+            }
+            setCorePoolSize(lowWaterMark);
+        }
+    }
+
+    public String toString() {
+        StringBuffer buf = new StringBuffer();
+        buf.append(super.toString());
+        buf.append(" [queue size: ");
+        buf.append(getSize());
+        buf.append(", max size: ");
+        buf.append(maxQueueSize);
+        buf.append(", threads: ");
+        buf.append(getPoolSize());
+        buf.append(", active threads: ");
+        buf.append(getActiveCount());
+        buf.append(", low water mark: ");
+        buf.append(getLowWaterMark());
+        buf.append(", high water mark: ");
+        buf.append(getHighWaterMark());
+        buf.append("]");
+        return buf.toString();
+    }
+    
+    // WorkQueue interface
+     
+    /* (non-Javadoc)
+     * @see org.objectweb.celtix.workqueue.WorkQueue#execute(java.lang.Runnable, long)
+     */
+    public void execute(Runnable work, long timeout) {
+        try {
+            execute(work);
+        } catch (RejectedExecutionException ree) {
+            try {
+                getQueue().offer(work, timeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ie) {
+                throw new RejectedExecutionException(ie);
+            }
+        }    
+    }
+
+    /* (non-Javadoc)
+     * @see org.objectweb.celtix.workqueue.WorkQueue#schedule(java.lang.Runnable, long)
+     */
+    public void schedule(final Runnable work, final long delay) {
+        // temporary implementation, replace with shared long-lived scheduler
+        // task
+        execute(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(delay);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+                work.run();
+            }
+        });
+    }
+    
+    // AutomaticWorkQueue interface
+    
+    public void shutdown(boolean processRemainingWorkItems) {
+        if (!processRemainingWorkItems) {
+            getQueue().clear();
+        }
+        shutdown();    
+    }
+
+    /**
+     * Gets the maximum size (capacity) of the backing queue.
+     * @return the maximum size (capacity) of the backing queue.
+     */
+    long getMaxSize() {
+        return maxQueueSize;
+    }
+
+    /**
+     * Gets the current size of the backing queue.
+     * @return the current size of the backing queue.
+     */
+    public long getSize() {
+        return getQueue().size();
+    }
+
+
+    public boolean isEmpty() {
+        return getQueue().size() == 0;
+    }
+
+    boolean isFull() {
+        return getQueue().remainingCapacity() == 0;
+    }
+
+    int getHighWaterMark() {
+        int hwm = getMaximumPoolSize();
+        return hwm == Integer.MAX_VALUE ? -1 : hwm;
+    }
+
+    int getLowWaterMark() {
+        int lwm = getCorePoolSize();
+        return lwm == Integer.MAX_VALUE ? -1 : lwm;
+    }
+
+    void setHighWaterMark(int hwm) {
+        setMaximumPoolSize(hwm < 0 ? Integer.MAX_VALUE : hwm);
+    }
+
+    void setLowWaterMark(int lwm) {
+        setCorePoolSize(lwm < 0 ? 0 : lwm);
+    }
+}

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/Messages.properties?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/Messages.properties (added)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/Messages.properties Tue Sep 12 01:17:11 2006
@@ -0,0 +1 @@
+THREAD_START_FAILURE_MSG = could not start required number of initial threads (only started {0} out of {1})

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/Messages.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/Messages.properties
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueInstrumentation.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueInstrumentation.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueInstrumentation.java (added)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueInstrumentation.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.workqueue;
+
+
+import org.apache.cxf.management.Instrumentation;
+import org.apache.cxf.management.annotation.ManagedAttribute;
+import org.apache.cxf.management.annotation.ManagedOperation;
+import org.apache.cxf.management.annotation.ManagedResource;
+import org.apache.cxf.workqueue.WorkQueueManager.ThreadingModel;
+
+@ManagedResource(componentName = "WorkQueue", 
+                 description = "The CXF internal thread pool for manangement ", 
+                 currencyTimeLimit = 15, persistPolicy = "OnUpdate", persistPeriod = 200)
+                 
+public class WorkQueueInstrumentation implements Instrumentation {    
+    private static final String INSTRUMENTED_NAME = "Bus.WorkQueue";
+    
+    private String objectName;
+    private WorkQueueManagerImpl wqManager;
+    private AutomaticWorkQueueImpl aWorkQueue;
+    
+    public WorkQueueInstrumentation(WorkQueueManagerImpl wq) {
+        wqManager = wq;        
+        objectName = "WorkQueue";
+        if (wqManager.autoQueue != null 
+            && AutomaticWorkQueueImpl.class.isAssignableFrom(wqManager.autoQueue.getClass())) {
+            aWorkQueue = (AutomaticWorkQueueImpl)wqManager.autoQueue;
+        }
+    }
+   
+    
+    @ManagedOperation(currencyTimeLimit = 30)
+    public void shutdown(boolean processRemainingWorkItems) {
+        wqManager.shutdown(processRemainingWorkItems); 
+    }
+    
+    @ManagedAttribute(description = "The thread pool work model",                      
+                      defaultValue = "SINGLE_THREADED",
+                      persistPolicy = "OnUpdate")
+                      
+    public String getThreadingModel() {        
+        return wqManager.getThreadingModel().toString();
+    }
+
+    public void setThreadingModel(String model) {
+        if (model.compareTo("SINGLE_THREADED") == 0) {
+            wqManager.setThreadingModel(ThreadingModel.SINGLE_THREADED);
+        }
+        if (model.compareTo("MULTI_THREADED") == 0) {
+            wqManager.setThreadingModel(ThreadingModel.MULTI_THREADED);
+        }             
+    }
+   
+    @ManagedAttribute(description = "The WorkQueueMaxSize",
+                      persistPolicy = "OnUpdate")
+    public long getWorkQueueMaxSize() {
+        return aWorkQueue.getMaxSize();
+    }
+   
+    @ManagedAttribute(description = "The WorkQueue Current size",
+                      persistPolicy = "OnUpdate")
+    public long getWorkQueueSize() {
+        return aWorkQueue.getSize();
+    }
+
+    @ManagedAttribute(description = "The WorkQueue has nothing to do",
+                      persistPolicy = "OnUpdate")
+    public boolean isEmpty() {
+        return aWorkQueue.isEmpty();
+    }
+
+    @ManagedAttribute(description = "The WorkQueue is very busy")
+    public boolean isFull() {
+        return aWorkQueue.isFull();
+    }
+
+    @ManagedAttribute(description = "The WorkQueue HighWaterMark",
+                      persistPolicy = "OnUpdate")
+    public int getHighWaterMark() {
+        return aWorkQueue.getHighWaterMark();
+    }
+    public void setHighWaterMark(int hwm) {
+        aWorkQueue.setHighWaterMark(hwm);
+    }
+
+    @ManagedAttribute(description = "The WorkQueue LowWaterMark",
+                      persistPolicy = "OnUpdate")
+    public int getLowWaterMark() {
+        return aWorkQueue.getLowWaterMark();
+    }
+
+    public void setLowWaterMark(int lwm) {
+        aWorkQueue.setLowWaterMark(lwm);
+    }
+    
+    
+    public Object getComponent() {        
+        return wqManager;
+    }
+
+    public String getInstrumentationName() {        
+        return INSTRUMENTED_NAME;
+    }
+
+    public String getUniqueInstrumentationName() {       
+        return objectName;
+    }   
+
+}

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueInstrumentation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java (added)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.workqueue;
+
+import java.util.logging.Logger;
+
+import javax.annotation.Resource;
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.event.ComponentEventFilter;
+import org.apache.cxf.event.Event;
+import org.apache.cxf.event.EventProcessor;
+import org.apache.cxf.management.Instrumentation;
+import org.apache.cxf.management.InstrumentationFactory;
+
+
+public class WorkQueueManagerImpl implements WorkQueueManager, InstrumentationFactory {
+
+    private static final Logger LOG =
+        Logger.getLogger(WorkQueueManagerImpl.class.getName());
+
+    ThreadingModel threadingModel = ThreadingModel.MULTI_THREADED;
+    AutomaticWorkQueue autoQueue;
+    boolean inShutdown;
+    Bus bus;  
+
+    /*public WorkQueueManagerImpl(Bus b) {
+        bus = b;
+    }*/
+    
+    public Bus getBus() {
+        return bus;
+    }
+    
+    @Resource
+    public void setBus(Bus bus) {        
+        this.bus = bus;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.objectweb.celtix.workqueue.WorkQueueManager#getAutomaticWorkQueue()
+     */
+    public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
+        if (autoQueue == null) {
+            autoQueue = createAutomaticWorkQueue();
+            EventProcessor ep = bus.getExtension(EventProcessor.class);
+            //setup the QName
+            QName eventID = new QName(ComponentEventFilter.COMPONENT_CREATED_EVENT);
+            if (null != ep) {                
+                ep.sendEvent(new Event(this, eventID));
+            }
+        }
+        return autoQueue;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.objectweb.celtix.workqueue.WorkQueueManager#getThreadingModel()
+     */
+    public ThreadingModel getThreadingModel() {
+        return threadingModel;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.objectweb.celtix.workqueue.WorkQueueManager#setThreadingModel(
+     *      org.objectweb.celtix.workqueue.WorkQueueManager.ThreadingModel)
+     */
+    public void setThreadingModel(ThreadingModel model) {
+        threadingModel = model;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.objectweb.celtix.workqueue.WorkQueueManager#shutdown(boolean)
+     */
+    public synchronized void shutdown(boolean processRemainingTasks) {
+        inShutdown = true;
+        if (autoQueue != null) {
+            autoQueue.shutdown(processRemainingTasks);
+        }
+
+        //sent out remove event.
+        EventProcessor ep = bus.getExtension(EventProcessor.class);
+        QName eventID = new QName(ComponentEventFilter.COMPONENT_REMOVED_EVENT);
+        if (null != ep) {
+            ep.sendEvent(new Event(this, eventID));        
+        }
+        synchronized (this) {
+            notifyAll();
+        }
+    }
+
+    public void run() {
+        synchronized (this) {
+            while (!inShutdown) {
+                try {            
+                    wait();
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+            while (autoQueue != null && !autoQueue.isShutdown()) {
+                try {            
+                    Thread.sleep(100);
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+        }
+        for (java.util.logging.Handler h : LOG.getHandlers())  {
+            h.flush();
+        }
+
+        //sent out creation event.        
+        
+        
+    }
+
+    private AutomaticWorkQueue createAutomaticWorkQueue() {        
+      
+        // Configuration configuration = bus.getConfiguration();
+
+        // configuration.getInteger("threadpool:initial_threads");
+        int initialThreads = 1;
+
+        // int lwm = configuration.getInteger("threadpool:low_water_mark");
+        int lwm = 5;
+
+        // int hwm = configuration.getInteger("threadpool:high_water_mark");
+        int hwm = 25;
+
+        // configuration.getInteger("threadpool:max_queue_size");
+        int maxQueueSize = 10 * hwm;
+
+        // configuration.getInteger("threadpool:dequeue_timeout");
+        long dequeueTimeout = 2 * 60 * 1000L;
+
+        return new AutomaticWorkQueueImpl(maxQueueSize, initialThreads, hwm, lwm, dequeueTimeout);
+               
+    }
+    
+    public Instrumentation createInstrumentation() {
+        return  new WorkQueueInstrumentation(this);
+    }
+}

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/WorkQueueManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cxf/trunk/rt/core/src/main/resources/META-INF/bus-extensions.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/resources/META-INF/bus-extensions.xml?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/resources/META-INF/bus-extensions.xml (original)
+++ incubator/cxf/trunk/rt/core/src/main/resources/META-INF/bus-extensions.xml Tue Sep 12 01:17:11 2006
@@ -29,5 +29,7 @@
            interface="org.apache.cxf.wsdl.WSDLManager"/>
     <extension class="org.apache.cxf.phase.PhaseManagerImpl"
            interface="org.apache.cxf.phase.PhaseManager"/>
+    <extension class="org.apache.cxf.workqueue.WorkQueueManagerImpl"
+       	   interface="org.apache.cxf.workqueue.WorkQueueManager"/>
     
 </extensions>

Added: incubator/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java (added)
+++ incubator/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.workqueue;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import junit.framework.TestCase;
+
+public class AutomaticWorkQueueTest extends TestCase {
+
+    public static final int UNBOUNDED_MAX_QUEUE_SIZE = -1;
+    public static final int UNBOUNDED_HIGH_WATER_MARK = -1;
+    public static final int UNBOUNDED_LOW_WATER_MARK = -1;
+
+    public static final int INITIAL_SIZE = 2;
+    public static final int DEFAULT_MAX_QUEUE_SIZE = 10;
+    public static final int DEFAULT_HIGH_WATER_MARK = 10;
+    public static final int DEFAULT_LOW_WATER_MARK = 1;
+    public static final long DEFAULT_DEQUEUE_TIMEOUT = 2 * 60 * 1000L;
+
+    public static final int TIMEOUT = 100;
+
+    AutomaticWorkQueueImpl workqueue;
+    public void tearDown() throws Exception {
+        if (workqueue != null) {
+            workqueue.shutdown(true);
+            workqueue = null;
+        }
+    }
+    
+    public void testUnboundedConstructor() {
+        workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
+                                               UNBOUNDED_HIGH_WATER_MARK,
+                                               UNBOUNDED_LOW_WATER_MARK,
+                                               DEFAULT_DEQUEUE_TIMEOUT);
+        assertNotNull(workqueue);
+        assertEquals(AutomaticWorkQueueImpl.DEFAULT_MAX_QUEUE_SIZE, workqueue.getMaxSize());
+        assertEquals(UNBOUNDED_HIGH_WATER_MARK, workqueue.getHighWaterMark());
+        assertEquals(UNBOUNDED_LOW_WATER_MARK, workqueue.getLowWaterMark());
+    }
+
+    public void testConstructor() {
+        workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
+                                               DEFAULT_HIGH_WATER_MARK,
+                                               DEFAULT_LOW_WATER_MARK,
+                                               DEFAULT_DEQUEUE_TIMEOUT);
+        assertNotNull(workqueue);
+        assertEquals(DEFAULT_MAX_QUEUE_SIZE, workqueue.getMaxSize());
+        assertEquals(DEFAULT_HIGH_WATER_MARK, workqueue.getHighWaterMark());
+        assertEquals(DEFAULT_LOW_WATER_MARK, workqueue.getLowWaterMark());
+    }
+
+    public void testEnqueue() {
+        workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
+                                               DEFAULT_HIGH_WATER_MARK,
+                                               DEFAULT_LOW_WATER_MARK,
+                                               DEFAULT_DEQUEUE_TIMEOUT);
+
+        try {
+            Thread.sleep(100);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        // We haven't enqueued anything yet, so should be zero
+        assertEquals(0, workqueue.getSize());
+        assertEquals(INITIAL_SIZE, workqueue.getPoolSize());
+
+        // Check that no threads are working yet, as we haven't enqueued
+        // anything yet.
+        assertEquals(0, workqueue.getActiveCount());
+
+        workqueue.execute(new TestWorkItem(), TIMEOUT);
+
+        // Give threads a chance to dequeue (5sec max)
+        int i = 0;
+        while (workqueue.getSize() != 0 && i++ < 50) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        assertEquals(0, workqueue.getSize());
+    }
+
+    public void testEnqueueImmediate() {
+        workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
+                                               DEFAULT_HIGH_WATER_MARK,
+                                               DEFAULT_LOW_WATER_MARK,
+                                               DEFAULT_DEQUEUE_TIMEOUT);
+
+        try {
+            Thread.sleep(100);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        // We haven't enqueued anything yet, so should there shouldn't be
+        // any items on the queue, the thread pool should still be the
+        // initial size and no threads should be working
+        //
+        assertEquals(0, workqueue.getSize());
+        assertEquals(INITIAL_SIZE, workqueue.getPoolSize());
+        assertEquals(0, workqueue.getActiveCount());
+
+        BlockingWorkItem[] workItems = new BlockingWorkItem[DEFAULT_HIGH_WATER_MARK];
+        BlockingWorkItem[] fillers = new BlockingWorkItem[DEFAULT_MAX_QUEUE_SIZE];
+
+        try {
+            // fill up the queue, then exhaust the thread pool
+            //
+            for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) {
+                workItems[i] = new BlockingWorkItem();
+                try {
+                    workqueue.execute(workItems[i]);
+                } catch (RejectedExecutionException ex) {
+                    fail("failed on item[" + i + "] with: " + ex);
+                }
+            }
+
+            while (workqueue.getActiveCount() < INITIAL_SIZE) {
+                try {
+                    Thread.sleep(250);
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+            }
+
+            for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) {
+                fillers[i] = new BlockingWorkItem();
+                try {
+                    workqueue.execute(fillers[i]);
+                } catch (RejectedExecutionException ex) {
+                    fail("failed on filler[" + i + "] with: " + ex);
+                }
+            }
+
+            // give threads a chance to start executing the work items
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException ex) {
+                // ignore
+            }
+
+            assertTrue(workqueue.toString(), workqueue.isFull());
+            assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK, workqueue.getPoolSize());
+            assertEquals(workqueue.toString(), DEFAULT_HIGH_WATER_MARK, workqueue.getActiveCount());
+
+            try {
+                workqueue.execute(new BlockingWorkItem());
+                fail("workitem should not have been accepted.");
+            } catch (RejectedExecutionException ex) {
+                // ignore
+            }
+
+            // unblock one work item and allow thread to dequeue next item
+
+            workItems[0].unblock();
+            boolean accepted = false;
+            workItems[0] = new BlockingWorkItem();
+
+            for (int i = 0; i < 20 && !accepted; i++) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+                try {
+                    workqueue.execute(workItems[0]);
+                    accepted = true;
+                } catch (RejectedExecutionException ex) {
+                    // ignore
+                }
+            }
+            assertTrue(accepted);
+        } finally {
+            for (int i = 0; i < DEFAULT_HIGH_WATER_MARK; i++) {
+                if (workItems[i] != null) {
+                    workItems[i].unblock();
+                }
+            }
+            for (int i = 0; i < DEFAULT_MAX_QUEUE_SIZE; i++) {
+                if (fillers[i] != null) {
+                    fillers[i].unblock();
+                }
+            }
+        }
+    }
+
+    public void testDeadLockEnqueueLoads() {
+        workqueue = new AutomaticWorkQueueImpl(500, 1, 2, 2,
+                                               DEFAULT_DEQUEUE_TIMEOUT);
+        DeadLockThread dead = new DeadLockThread(workqueue, 200,
+                                                 10L);
+
+        assertTrue(checkDeadLock(dead));
+    }
+
+    public void testNonDeadLockEnqueueLoads() {
+        workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE,
+                                               INITIAL_SIZE,
+                                               UNBOUNDED_HIGH_WATER_MARK,
+                                               UNBOUNDED_LOW_WATER_MARK,
+                                               DEFAULT_DEQUEUE_TIMEOUT);
+        DeadLockThread dead = new DeadLockThread(workqueue, 200);
+
+        assertTrue(checkDeadLock(dead));
+    }
+    
+    public void testSchedule() throws Exception {
+        workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
+                                               UNBOUNDED_HIGH_WATER_MARK,
+                                               UNBOUNDED_LOW_WATER_MARK,
+                                               DEFAULT_DEQUEUE_TIMEOUT);
+        final Lock runLock = new ReentrantLock();
+        final Condition runCondition = runLock.newCondition();
+        long start = System.currentTimeMillis();
+        Runnable doNothing = new Runnable() {
+            public void run() {
+                runLock.lock();
+                try {
+                    runCondition.signal();
+                } finally {
+                    runLock.unlock();
+                }
+            }
+        };
+        
+        workqueue.schedule(doNothing, 5000);
+        
+        runLock.lock();
+        try {
+            runCondition.await();
+        } finally {
+            runLock.unlock();
+        }
+        
+        assertTrue("expected delay",
+                   System.currentTimeMillis() - start >= 4950);
+    }
+
+    public void testThreadPoolShrink() {
+        workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, 20, 20, 10, 100L);
+
+        DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L);
+
+        assertTrue("Should be finished, probably deadlocked", checkDeadLock(dead));
+
+        // Give threads a chance to dequeue (5sec max)
+        int i = 0;
+        while (workqueue.getPoolSize() != 10 && i++ < 50) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        assertEquals(workqueue.getLowWaterMark(), workqueue.getPoolSize());
+    }
+
+    public void testThreadPoolShrinkUnbounded() {
+        workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE,
+                                               UNBOUNDED_HIGH_WATER_MARK,
+                                               DEFAULT_LOW_WATER_MARK, 100L);
+
+        DeadLockThread dead = new DeadLockThread(workqueue, 1000, 5L);
+        assertTrue("Should be finished, probably deadlocked", checkDeadLock(dead));
+
+        // Give threads a chance to dequeue (5sec max)
+        int i = 0;
+        int last = workqueue.getPoolSize();
+        while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK && i++ < 50) {
+            if (last != workqueue.getPoolSize()) {
+                last = workqueue.getPoolSize();
+                i = 0;
+            }
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        assertTrue("threads_total()", workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK);
+    }
+
+    public void testShutdown() {
+        workqueue = new AutomaticWorkQueueImpl(DEFAULT_MAX_QUEUE_SIZE, INITIAL_SIZE,
+                                               INITIAL_SIZE, INITIAL_SIZE, 250);
+
+        assertEquals(0, workqueue.getSize());
+        DeadLockThread dead = new DeadLockThread(workqueue, 100, 5L);
+        dead.start();
+        assertTrue(checkCompleted(dead));
+
+        workqueue.shutdown(true);
+
+        // Give threads a chance to shutdown (1 sec max)
+        for (int i = 0; i < 20 && (workqueue.getSize() > 0 || workqueue.getPoolSize() > 0); i++) {
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        assertEquals(0, workqueue.getSize());
+        assertEquals(0, workqueue.getPoolSize());
+        
+        //already shutdown
+        workqueue = null;
+    }
+
+    private boolean checkCompleted(DeadLockThread dead) {
+        int oldCompleted = 0;
+        int newCompleted = 0;
+        int noProgressCount = 0;
+        while (!dead.isFinished()) {
+            newCompleted = dead.getWorkItemCompletedCount();
+            if (newCompleted > oldCompleted) {
+                oldCompleted = newCompleted;
+                noProgressCount = 0;
+            } else {
+                // No reduction in the completion count so it may be deadlocked,
+                // allow thread to make no progress for 5 time-slices before
+                // assuming a deadlock has occurred
+                //  
+                if (oldCompleted != 0
+                    && ++noProgressCount > 5) {
+                    return false;
+                }
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+        return true;
+    }
+
+    private boolean checkDeadLock(DeadLockThread dead) {
+        dead.start();
+        return checkCompleted(dead);
+    }
+
+    public class TestWorkItem implements Runnable {
+        String name;
+        long worktime;
+        Callback callback;
+
+        public TestWorkItem() {
+            this("WI");
+        }
+
+        public TestWorkItem(String n) {
+            this(n, DeadLockThread.DEFAULT_WORK_TIME);
+        }
+
+        public TestWorkItem(String n, long wt) {
+            this(n, wt, null);
+        }
+
+        public TestWorkItem(String n, long wt, Callback c) {
+            name = n;
+            worktime = wt;
+            callback = c;
+        }
+
+        public void run() {
+            try {
+                try {
+                    Thread.sleep(worktime);
+                } catch (InterruptedException ie) {
+                    // ignore
+                    return;
+                }
+            } finally {
+                if (callback != null) {
+                    callback.workItemCompleted(name);
+                }
+            }
+        }
+
+        public String toString() {
+            return "[TestWorkItem:name=" + name + "]";
+        }
+    }
+
+    public class BlockingWorkItem implements Runnable {
+        private boolean unblocked;
+
+        public void run() {
+            synchronized (this) {
+                while (!unblocked) {
+                    try {
+                        wait();
+                    } catch (InterruptedException ie) {
+                        // ignore
+                    }
+                }
+            }
+        }
+
+        void unblock() {
+            synchronized (this) {
+                unblocked = true;
+                notify();
+            }
+        }
+    }
+
+    public interface Callback {
+        void workItemCompleted(String name);
+    }
+
+    public class DeadLockThread extends Thread implements Callback {
+        public static final long DEFAULT_WORK_TIME = 10L;
+        public static final int DEFAULT_WORK_ITEMS = 200;
+
+        AutomaticWorkQueueImpl workqueue;
+        int nWorkItems;
+        int nWorkItemsCompleted;
+        long worktime;
+        long finishTime;
+        long startTime;
+
+        public DeadLockThread(AutomaticWorkQueueImpl wq) {
+            this(wq, DEFAULT_WORK_ITEMS, DEFAULT_WORK_TIME);
+        }
+
+        public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi) {
+            this(wq, nwi, DEFAULT_WORK_TIME);
+        }
+
+        public DeadLockThread(AutomaticWorkQueueImpl wq, int nwi, long wt) {
+            workqueue = wq;
+            nWorkItems = nwi;
+            worktime = wt;
+        }
+
+        public synchronized boolean isFinished() {
+            return nWorkItemsCompleted == nWorkItems;
+        }
+
+        public synchronized void workItemCompleted(String name) {
+            nWorkItemsCompleted++;
+            if (isFinished()) {
+                finishTime = System.currentTimeMillis();
+            }
+        }
+
+        public int getWorkItemCount() {
+            return nWorkItems;
+        }
+
+        public long worktime() {
+            return worktime;
+        }
+
+        public synchronized int getWorkItemCompletedCount() {
+            return nWorkItemsCompleted;
+        }
+
+        public long finishTime() {
+            return finishTime;
+        }
+
+        public long duration() {
+            return finishTime - startTime;
+        }
+
+        public void run() {
+            startTime = System.currentTimeMillis();
+
+            for (int i = 0; i < nWorkItems; i++) {
+                try {
+                    workqueue.execute(new TestWorkItem(String.valueOf(i), worktime, this), TIMEOUT);
+                } catch (RejectedExecutionException ex) {
+                    // ignore
+                }
+            }
+            while (!isFinished()) {
+                try {
+                    Thread.sleep(worktime);
+                } catch (InterruptedException ie) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+}

Propchange: incubator/cxf/trunk/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cxf/trunk/rt/management/pom.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/pom.xml?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/pom.xml (original)
+++ incubator/cxf/trunk/rt/management/pom.xml Tue Sep 12 01:17:11 2006
@@ -52,6 +52,11 @@
             <artifactId>cxf-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.cxf</groupId>

Modified: incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/InstrumentationManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/InstrumentationManagerImpl.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/InstrumentationManagerImpl.java (original)
+++ incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/InstrumentationManagerImpl.java Tue Sep 12 01:17:11 2006
@@ -25,11 +25,17 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.annotation.Resource;
 import javax.management.MBeanServer;
 
+import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.instrumentation.types.InstrumentationPolicyType;
 import org.apache.cxf.configuration.instrumentation.types.MBServerPolicyType;
+import org.apache.cxf.event.ComponentEventFilter;
+import org.apache.cxf.event.Event;
+import org.apache.cxf.event.EventListener;
+import org.apache.cxf.event.EventProcessor;
 import org.apache.cxf.management.jmx.JMXManagedComponentManager;
 import org.apache.cxf.oldcfg.CompoundName;
 import org.apache.cxf.oldcfg.Configuration;
@@ -39,7 +45,7 @@
 
 
 
-public class InstrumentationManagerImpl implements InstrumentationManager {    
+public class InstrumentationManagerImpl implements InstrumentationManager, EventListener {
     static final Logger LOG = LogUtils.getL7dLogger(InstrumentationManagerImpl.class);
     static final String INSTRUMENTATION_CONFIGURATION_URI = 
         "http://cxf.apache.org/configuration/instrumentation";
@@ -48,25 +54,46 @@
 
     static final String INSTRUMENTATION_CONFIGURATION_ID = 
         "instrumentation";
+    Bus bus;
     
     private Configuration configuration;
     private List <Instrumentation> instrumentations;
     private JMXManagedComponentManager jmxManagedComponentManager;
-
+    
     
     public InstrumentationManagerImpl() {
         this(new ConfigurationBuilderImpl());
     }
 
     public InstrumentationManagerImpl(ConfigurationBuilder builder) {
-        LOG.info("Setting up InstrumentationManager");
         
         configuration = getConfiguration(builder);
+        
+    }
+    
+    public Bus getBus() {
+        return bus;
+    }
+    
+    @Resource
+    public void setBus(Bus bus) {        
+        this.bus = bus;
+        initInstrumentationManagerImpl();
+    }
+    
+    public void initInstrumentationManagerImpl() {
+        LOG.info("Setting up InstrumentationManager");
         InstrumentationPolicyType ip = 
             configuration.getObject(InstrumentationPolicyType.class, "InstrumentationControl");   
         
         if (ip.isInstrumentationEnabled()) {
             instrumentations = new LinkedList<Instrumentation>();
+            //regist to the event process
+            ComponentEventFilter componentEventFilter = new ComponentEventFilter();
+            EventProcessor ep = bus.getExtension(EventProcessor.class);
+            if (null != ep) {                
+                ep.addEventListener((EventListener)this, componentEventFilter);
+            }    
         }
             
         if (ip.isJMXEnabled()) {           
@@ -74,8 +101,6 @@
             MBServerPolicyType mbsp = configuration.getObject(MBServerPolicyType.class, "MBServer");
             jmxManagedComponentManager.init(mbsp);
         }
-        
-        
     }
     
     private Configuration getConfiguration(ConfigurationBuilder cb) {
@@ -126,6 +151,24 @@
 
     public MBeanServer getMBeanServer() {        
         return jmxManagedComponentManager.getMBeanServer();
+    }
+
+    public void processEvent(Event e) {
+        Instrumentation it;
+        if (e.getID().getLocalPart().equals(ComponentEventFilter.COMPONENT_CREATED_EVENT)) {
+            it = ((InstrumentationFactory)(e.getSource())).createInstrumentation();
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.info("Instrumentation register " + e.getSource().getClass().getName());
+            }   
+            register(it);          
+            
+        } else if (e.getID().getLocalPart().equals(ComponentEventFilter.COMPONENT_REMOVED_EVENT)) {           
+            if (LOG.isLoggable(Level.INFO)) {
+                LOG.info("Instrumentation unregister " + e.getSource().getClass().getName());
+            }    
+            unregister(e.getSource());
+        }
+        
     }
       
 

Modified: incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXManagedComponentManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXManagedComponentManager.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXManagedComponentManager.java (original)
+++ incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXManagedComponentManager.java Tue Sep 12 01:17:11 2006
@@ -181,7 +181,8 @@
                                + instrumentation.getUniqueInstrumentationName());
         }   
     }
-    
+
+   
 
 }
 

Modified: incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXUtils.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXUtils.java (original)
+++ incubator/cxf/trunk/rt/management/src/main/java/org/apache/cxf/management/jmx/JMXUtils.java Tue Sep 12 01:17:11 2006
@@ -65,8 +65,7 @@
      */
  
     public static ObjectName getObjectName(String type, String name) {        
-        String objectName = ":type=" + type + ",name=" + name;
-        
+        String objectName = ":type=" + type + ",name=" + name;        
         try {
             return new ObjectName(DOMAIN_STRING + objectName);
         } catch (Exception ex) {

Modified: incubator/cxf/trunk/rt/management/src/main/resources/META-INF/bus-extensions.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/src/main/resources/META-INF/bus-extensions.xml?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/src/main/resources/META-INF/bus-extensions.xml (original)
+++ incubator/cxf/trunk/rt/management/src/main/resources/META-INF/bus-extensions.xml Tue Sep 12 01:17:11 2006
@@ -18,10 +18,10 @@
   under the License.
 -->
 <extensions xmlns="http://cxf.apache.org/bus/extension">
-
-    <extension class="org.apache.cxf.management.InstrumentationManagerImpl"
-           interface="org.apache.cxf.management.InstrumentationManager"/>
+   
     <extension class="org.apache.cxf.event.EventProcessorImpl"
            interface="org.apache.cxf.event.EventProcessor"/>
+    <extension class="org.apache.cxf.management.InstrumentationManagerImpl"
+           interface="org.apache.cxf.management.InstrumentationManager"/>       
     
 </extensions>

Modified: incubator/cxf/trunk/rt/management/src/main/resources/config-metadata/instrumentation-config.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/src/main/resources/config-metadata/instrumentation-config.xml?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/src/main/resources/config-metadata/instrumentation-config.xml (original)
+++ incubator/cxf/trunk/rt/management/src/main/resources/config-metadata/instrumentation-config.xml Tue Sep 12 01:17:11 2006
@@ -36,7 +36,7 @@
         <!-- attribute values are defaulted -->
         <im:instrumentation>
         <im:InstrumentationEnabled>true</im:InstrumentationEnabled>
-        <im:JMXEnabled>false</im:JMXEnabled>
+        <im:JMXEnabled>true</im:JMXEnabled>
         </im:instrumentation>
     </cm:configItem>
     

Added: incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java?view=auto&rev=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java (added)
+++ incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java Tue Sep 12 01:17:11 2006
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.management;
+
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.bus.CXFBusFactory;
+import org.apache.cxf.event.ComponentEventFilter;
+import org.apache.cxf.event.Event;
+import org.apache.cxf.event.EventProcessor;
+import org.apache.cxf.workqueue.WorkQueueInstrumentation;
+import org.apache.cxf.workqueue.WorkQueueManagerImpl;
+
+public class InstrumentationManagerTest extends TestCase {
+    InstrumentationManager im;
+    Bus bus;
+    
+    public void setUp() throws Exception {
+        CXFBusFactory bf = new CXFBusFactory();
+        bus = new CXFBusFactory().createBus();
+        bf.setDefaultBus(bus);
+        im = bus.getExtension(InstrumentationManager.class);
+    }
+    
+    public void tearDown() throws Exception {
+        //test case had done the bus.shutdown
+        bus.shutdown(true);
+    }
+    
+    // try to get WorkQueue information
+    public void testWorkQueueInstrumentation() throws Exception {
+        assertTrue("Instrumentation Manager should not be null", im != null);
+        //im.getAllInstrumentation();
+        WorkQueueManagerImpl wqm = new WorkQueueManagerImpl();
+        wqm.setBus(bus);
+        EventProcessor ep = bus.getExtension(EventProcessor.class);
+        QName eventID = new QName(ComponentEventFilter.COMPONENT_CREATED_EVENT);
+        if (null != ep) {
+            System.out.println("send automaticWorkQueue created event");
+            ep.sendEvent(new Event(wqm, eventID));
+        }        
+        
+        //NOTE: now the bus WorkQueueManager is lazy load , if WorkQueueManager
+        //create with bus , this test could be failed.
+        List<Instrumentation> list = im.getAllInstrumentation();
+        //NOTE: change for the BindingManager and TransportFactoryManager instrumentation
+        // create with the bus.
+        assertEquals("Too many instrumented items", 1, list.size());
+        Instrumentation it1 = list.get(0);
+        //Instrumentation it2 = list.get(3);
+        assertTrue("Item 1 not a WorkQueueInstrumentation",
+                   WorkQueueInstrumentation.class.isAssignableFrom(it1.getClass()));
+        
+        // not check for the instrumentation unique name
+        // sleep for the MBServer connector thread startup
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            // do nothing
+        }
+        eventID = new QName(ComponentEventFilter.COMPONENT_REMOVED_EVENT);
+        if (null != ep) {
+            ep.sendEvent(new Event(wqm, eventID));
+        }    
+        assertEquals("Instrumented stuff not removed from list", 0, list.size());
+        bus.shutdown(true);
+        assertEquals("Instrumented stuff not removed from list", 0, list.size());
+    }
+
+
+
+
+}

Propchange: incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/InstrumentationManagerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/jmx/JMXManagedComponentManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/jmx/JMXManagedComponentManagerTest.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/jmx/JMXManagedComponentManagerTest.java (original)
+++ incubator/cxf/trunk/rt/management/src/test/java/org/apache/cxf/management/jmx/JMXManagedComponentManagerTest.java Tue Sep 12 01:17:11 2006
@@ -19,16 +19,20 @@
 
 package org.apache.cxf.management.jmx;
 
+import javax.management.ObjectName;
+
 import junit.framework.TestCase;
 
 import org.apache.cxf.BusException;
 import org.apache.cxf.configuration.instrumentation.types.JMXConnectorPolicyType;
 import org.apache.cxf.configuration.instrumentation.types.MBServerPolicyType;
+import org.apache.cxf.management.jmx.export.AnnotationTestInstrumentation;
 
 
 public class JMXManagedComponentManagerTest extends TestCase {
-   
-    private JMXManagedComponentManager manager;    
+       
+    private static final String NAME_ATTRIBUTE = "Name";    
+    private JMXManagedComponentManager manager;
     
     public void setUp() throws BusException {
         manager = new JMXManagedComponentManager(); 
@@ -49,6 +53,34 @@
             assertTrue("JMX Manager init with NewMBeanServer error", false);
             ex.printStackTrace();
         }
+    }
+    
+    public void testRegisterInstrumentation() {
+        MBServerPolicyType policy = new MBServerPolicyType();
+        JMXConnectorPolicyType connector = new JMXConnectorPolicyType();        
+        policy.setJMXConnector(connector);        
+        connector.setDaemon(false);
+        connector.setThreaded(false);
+        connector.setJMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:9913/jmxrmi");
+        manager.init(policy);
+        // setup the fack instrumentation
+        AnnotationTestInstrumentation im = new AnnotationTestInstrumentation();
+        ObjectName name = JMXUtils.getObjectName(im.getUniqueInstrumentationName(), 
+                                                 im.getInstrumentationName());
+       
+        im.setName("John Smith");          
+        manager.registerMBean(im);
+        
+        try {            
+            Object val = manager.getMBeanServer().getAttribute(name, NAME_ATTRIBUTE);
+            assertEquals("Incorrect result", "John Smith", val);
+            Thread.sleep(300);
+        } catch (Exception ex) {            
+            ex.printStackTrace();
+            assertTrue("get instrumentation attribute error", false);
+        }
+        manager.unregisterMBean(im);
+        manager.shutdown();
     }
 
 }

Modified: incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Sep 12 01:17:11 2006
@@ -31,6 +31,8 @@
 import java.util.GregorianCalendar;
 import java.util.SimpleTimeZone;
 import java.util.TimeZone;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -53,6 +55,7 @@
 import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transports.jms.context.JMSMessageHeadersType;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
@@ -167,9 +170,9 @@
             }
             sessionFactory.shutdown();
         } catch (InterruptedException e) {
-            //Don't do anything...
+            //Do nothing here
         } catch (JMSException ex) {
-            //
+            //Do nothing here
         }
     }
 
@@ -273,11 +276,13 @@
                     }
                     while (message != null) {
                         //REVISIT  to get the thread pool                        
-                        /*
-                        Executor executor = jmsDestination.callback.getExecutor();
+                        //Executor executor = jmsDestination.callback.getExecutor();
+                        Executor executor = null;
                         if (executor == null) {
-                            executor = jmsDestination.bus
-                                .getWorkQueueManager().getAutomaticWorkQueue();
+                            WorkQueueManager wqm = jmsDestination.bus.getExtension(WorkQueueManager.class);
+                            if (null != wqm) {
+                                executor = wqm.getAutomaticWorkQueue();
+                            }    
                         }
                         if (executor != null) {
                             try {
@@ -289,14 +294,13 @@
                                 //although we could just dispatch on this thread.
                             }                            
                         } else {
-                            //shouldn't ever get here....
-                            
-                        }*/
-                        try {
-                            jmsDestination.incoming(message);
-                        } catch (IOException ex) {
-                            LOG.log(Level.WARNING, "Failed to process incoming message : ", ex);
-                        }
+                            LOG.log(Level.INFO, "handle the incoming message in listener thread");
+                            try {
+                                jmsDestination.incoming(message);
+                            } catch (IOException ex) {
+                                LOG.log(Level.WARNING, "Failed to process incoming message : ", ex);
+                            }                            
+                        }                        
                         message = null;
                     }
                 }
@@ -320,6 +324,7 @@
         }
 
         public void run() {
+            LOG.log(Level.INFO, "run the incoming message in the threadpool");
             try {
                 jmsDestination.incoming(message);
             } catch (IOException ex) {

Modified: incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?view=diff&rev=442516&r1=442515&r2=442516
==============================================================================
--- incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java (original)
+++ incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java Tue Sep 12 01:17:11 2006
@@ -53,24 +53,7 @@
             dfm.registerDestinationFactory(ns, this);
         }
     }
-    
-    /*
-    public ServerTransport createServerTransport(EndpointReferenceType address) 
-        throws WSDLException, IOException {
-        return new JMSServerTransport(theBus, address);
-    }
-     
-    public ServerTransport createTransientServerTransport(EndpointReferenceType address)
-        throws WSDLException {
-        return null;
-    }
-     
-    public ClientTransport createClientTransport(EndpointReferenceType address,
-                                                 ClientBinding binding) 
-        throws WSDLException, IOException {
-        return new JMSClientTransport(theBus, address, binding);
-    }*/
-
+   
     public Conduit getConduit(EndpointInfo targetInfo) throws IOException {
         return new JMSConduit(bus, targetInfo);
     }



Mime
View raw message