tuscany-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luciano Resende" <luckbr1...@gmail.com>
Subject Please review changes to bring support to nonBlockingInterceptor Fwd: svn commit: r528930 - in /incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core: bootstrap/ deployer/ runtime/ work/
Date Sun, 15 Apr 2007 05:58:24 GMT
Hey

   I tried to bring up the code to make nonBlockInerceptor working again.
   Could someone please review my changes.

Thanks

---------- Forwarded message ----------
From: lresende@apache.org <lresende@apache.org>
Date: Apr 14, 2007 10:53 PM
Subject: svn commit: r528930 - in
/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core:
bootstrap/ deployer/ runtime/ work/
To: tuscany-commits@ws.apache.org

Author: lresende
Date: Sat Apr 14 22:53:09 2007
New Revision: 528930

URL: http://svn.apache.org/viewvc?view=rev&rev=528930
Log:
Adding support for nonBlockingInterceptor

Added:
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java
(with props)
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java
(with props)
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java
(with props)
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java
(with props)
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java
(with props)
Modified:
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java

Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java?view=diff&rev=528930&r1=528929&r2=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java
Sat Apr 14 22:53:09 2007
@@ -23,6 +23,7 @@
import org.apache.tuscany.core.builder.BuilderRegistryImpl;
import org.apache.tuscany.core.builder.WirePostProcessorRegistryImpl;
import org.apache.tuscany.core.component.ComponentManagerImpl;
+import org.apache.tuscany.core.component.WorkContextImpl;
import org.apache.tuscany.core.component.scope.AbstractScopeContainer;
import org.apache.tuscany.core.component.scope.CompositeScopeContainer;
import org.apache.tuscany.core.component.scope.RequestScopeContainer;
@@ -30,15 +31,21 @@
import org.apache.tuscany.core.component.scope.StatelessScopeContainer;
import org.apache.tuscany.core.deployer.DeployerImpl;
import org.apache.tuscany.core.implementation.composite.CompositeBuilder;
+import org.apache.tuscany.core.work.Jsr237WorkScheduler;
+import org.apache.tuscany.core.work.ThreadPoolWorkManager;
import org.apache.tuscany.host.MonitorFactory;
import org.apache.tuscany.spi.bootstrap.ExtensionPointRegistry;
import org.apache.tuscany.spi.builder.BuilderRegistry;
import org.apache.tuscany.spi.component.ComponentManager;
import org.apache.tuscany.spi.component.ScopeContainerMonitor;
import org.apache.tuscany.spi.component.ScopeRegistry;
+import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.deployer.Deployer;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
import org.apache.tuscany.spi.wire.WirePostProcessorRegistry;

+import commonj.work.WorkManager;
+
/**
  * A default implementation of a Bootstrapper. Please see the documentation
on
  * the individual methods for how the primordial components are created.
@@ -95,7 +102,10 @@
     public Deployer createDeployer(ExtensionPointRegistry
extensionRegistry) {
         ScopeRegistry scopeRegistry = getScopeRegistry();
         BuilderRegistry builder = createBuilder(scopeRegistry);
-        DeployerImpl deployer = new DeployerImpl(xmlFactory, builder,
componentManager);
+        WorkContext workContext = new WorkContextImpl();
+        WorkManager workManager = new ThreadPoolWorkManager(10);
+        WorkScheduler workScheduler = new Jsr237WorkScheduler(workManager);
+        DeployerImpl deployer = new DeployerImpl(xmlFactory, builder,
componentManager, workScheduler, workContext);
         deployer.setScopeRegistry(getScopeRegistry());
         WirePostProcessorRegistry wirePostProcessorRegistry = new
WirePostProcessorRegistryImpl();
         deployer.setWirePostProcessorRegistry(wirePostProcessorRegistry);

Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java?view=diff&rev=528930&r1=528929&r2=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java
Sat Apr 14 22:53:09 2007
@@ -31,7 +31,6 @@
import org.apache.tuscany.assembly.Composite;
import org.apache.tuscany.assembly.CompositeReference;
import org.apache.tuscany.assembly.CompositeService;
-import org.apache.tuscany.assembly.Implementation;
import org.apache.tuscany.assembly.Multiplicity;
import org.apache.tuscany.assembly.SCABinding;
import org.apache.tuscany.assembly.impl.DefaultAssemblyFactory;
@@ -40,6 +39,7 @@
import org.apache.tuscany.core.builder.WireCreationException;
import org.apache.tuscany.core.wire.InvocationChainImpl;
import org.apache.tuscany.core.wire.InvokerInterceptor;
+import org.apache.tuscany.core.wire.NonBlockingInterceptor;
import org.apache.tuscany.core.wire.WireImpl;
import org.apache.tuscany.core.wire.WireUtils;
import
org.apache.tuscany.interfacedef.IncompatibleInterfaceContractException;
@@ -66,10 +66,11 @@
import org.apache.tuscany.spi.component.Service;
import org.apache.tuscany.spi.component.ServiceBinding;
import org.apache.tuscany.spi.component.TargetInvokerCreationException;
+import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.deployer.Deployer;
import org.apache.tuscany.spi.deployer.DeploymentContext;
import org.apache.tuscany.spi.resolver.ResolutionException;
-import org.apache.tuscany.spi.util.UriHelper;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
import org.apache.tuscany.spi.wire.InvocationChain;
import org.apache.tuscany.spi.wire.Wire;
import org.apache.tuscany.spi.wire.WirePostProcessorRegistry;
@@ -86,11 +87,15 @@
     private ScopeRegistry scopeRegistry;
     private InterfaceContractMapper mapper = new
DefaultInterfaceContractMapper();
     private WirePostProcessorRegistry postProcessorRegistry;
+    private WorkScheduler workScheduler;
+    private WorkContext workContext;

-    public DeployerImpl(XMLInputFactory xmlFactory, Builder builder,
ComponentManager componentManager) {
+    public DeployerImpl(XMLInputFactory xmlFactory, Builder builder,
ComponentManager componentManager, WorkScheduler workScheduler, WorkContext
workContext) {
         this.xmlFactory = xmlFactory;
         this.builder = builder;
         this.componentManager = componentManager;
+        this.workScheduler = workScheduler;
+        this.workContext = workContext;
     }

     public DeployerImpl() {
@@ -343,10 +348,10 @@
         for (Operation operation :
sourceContract.getInterface().getOperations())
{
             Operation targetOperation = mapper.map(
targetContract.getInterface(), operation);
             InvocationChain chain = new InvocationChainImpl(operation,
targetOperation);
-            /*
-             * if (operation.isNonBlocking()) { chain.addInterceptor(new
-             * NonBlockingInterceptor(scheduler, workContext)); }
-             */
+               /* lresende */
+               if (operation.isNonBlocking()) {
+                   chain.addInterceptor(new
NonBlockingInterceptor(workScheduler, workContext)); }
+               /* lresende */
             chain.addInterceptor(new InvokerInterceptor());
             wire.addInvocationChain(chain);

@@ -355,10 +360,10 @@
             for (Operation operation :
sourceContract.getCallbackInterface().getOperations())
{
                 Operation targetOperation = mapper.map(
targetContract.getCallbackInterface(), operation);
                 InvocationChain chain = new InvocationChainImpl(operation,
targetOperation);
-                /*
-                 * if (operation.isNonBlocking()) { chain.addInterceptor
(new
-                 * NonBlockingInterceptor(scheduler, workContext)); }
-                 */
+                   /* lresende */
+                   if (operation.isNonBlocking()) {
+                       chain.addInterceptor(new
NonBlockingInterceptor(workScheduler, workContext)); }
+                   /* lresende */
                 chain.addInterceptor(new InvokerInterceptor());
                 wire.addCallbackInvocationChain(chain);
             }

Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java?view=diff&rev=528930&r1=528929&r2=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java
Sat Apr 14 22:53:09 2007
@@ -45,10 +45,12 @@
import org.apache.tuscany.core.bootstrap.Bootstrapper;
import org.apache.tuscany.core.bootstrap.DefaultBootstrapper;
import org.apache.tuscany.core.component.ComponentManagerImpl;
-import org.apache.tuscany.core.component.SimpleWorkContext;
+import org.apache.tuscany.core.component.WorkContextImpl;
import org.apache.tuscany.core.monitor.NullMonitorFactory;
import org.apache.tuscany.core.services.classloading.ClassLoaderRegistryImpl
;
import org.apache.tuscany.core.util.IOHelper;
+import org.apache.tuscany.core.work.Jsr237WorkScheduler;
+import org.apache.tuscany.core.work.ThreadPoolWorkManager;
import org.apache.tuscany.host.MonitorFactory;
import org.apache.tuscany.host.RuntimeInfo;
import org.apache.tuscany.host.management.ManagementService;
@@ -70,8 +72,11 @@
import org.apache.tuscany.spi.deployer.Deployer;
import org.apache.tuscany.spi.services.classloading.ClassLoaderRegistry;
import org.apache.tuscany.spi.services.management.TuscanyManagementService;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
import org.osoa.sca.ComponentContext;

+import commonj.work.WorkManager;
+
/**
  * @version $Rev$ $Date$
  */
@@ -83,6 +88,8 @@
     private static final URI SCOPE_REGISTRY_URI =
TUSCANY_SYSTEM_ROOT.resolve("ScopeRegistry");

     private static final URI WORK_CONTEXT_URI =
TUSCANY_SYSTEM.resolve("WorkContext");
+
+    private static final URI WORK_SCHEDULER_URI =
TUSCANY_SYSTEM.resolve("WorkScheduler");

     private static final URI RUNTIME_INFO_URI =
TUSCANY_SYSTEM_ROOT.resolve("RuntimeInfo");

@@ -211,7 +218,9 @@
         extensionRegistry.addExtensionPoint(ContributionService.class,
contributionService);

         registerSystemComponent(TUSCANY_DEPLOYER, Deployer.class,
deployer);
-        registerSystemComponent(WORK_CONTEXT_URI, WorkContext.class, new
SimpleWorkContext());
+        registerSystemComponent(WORK_CONTEXT_URI, WorkContext.class, new
WorkContextImpl());
+        WorkManager workManager = new ThreadPoolWorkManager(10);
+        registerSystemComponent(WORK_SCHEDULER_URI, WorkScheduler.class,
new Jsr237WorkScheduler(workManager)); //lresende

         this.scopeRegistry = bootstrapper.getScopeRegistry();


Added:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java?view=auto&rev=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java
(added)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java
Sat Apr 14 22:53:09 2007
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+
+import commonj.work.WorkEvent;
+import commonj.work.WorkException;
+import commonj.work.WorkItem;
+
+/**
+ * Default immutable implementation of the <code>WorkEvent</code> class.
+ */
+class DefaultWorkEvent implements WorkEvent {
+
+    // Work item for this event
+    private WorkItem workItem;
+
+    // Exception if something has gone wrong
+    private WorkException exception;
+
+    /**
+     * Instantiates the event.
+     *
+     * @param workItem Work item for this event.
+     */
+    public DefaultWorkEvent(final DefaultWorkItem workItem) {
+        this.workItem = workItem;
+        this.exception = workItem.getException();
+    }
+
+    /**
+     * Returns the work type based on whether the work was accepted,
started,
+     * rejected or completed.
+     *
+     * @return Work type.
+     */
+    public int getType() {
+        return workItem.getStatus();
+    }
+
+    /**
+     * Returns the work item associated with this work type.
+     *
+     * @return Work item.
+     */
+    public WorkItem getWorkItem() {
+        return workItem;
+    }
+
+    /**
+     * Returns the exception if the work completed with an exception.
+     *
+     * @return Work exception.
+     */
+    public WorkException getException() {
+        return exception;
+    }
+}

Propchange:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java?view=auto&rev=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java
(added)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java
Sat Apr 14 22:53:09 2007
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+import commonj.work.Work;
+import commonj.work.WorkException;
+import commonj.work.WorkItem;
+
+/**
+ * An identity based immutable implementation of the <code>WorkItem</code>
+ * interface.
+ *
+ */
+class DefaultWorkItem implements WorkItem {
+
+    // Id scoped for the VM
+    private String id;
+
+    // Status
+    private int status = -1;
+
+    // Result
+    private Work result;
+
+    // Original work
+    private Work originalWork;
+
+    // Exception
+    private WorkException exception;
+
+    /**
+     * Instantiates an id for this item.
+     *
+     * @param id of this work event.
+     */
+    protected DefaultWorkItem(final String id, final Work orginalWork) {
+        this.id = id;
+        this.originalWork = orginalWork;
+    }
+
+    /**
+     * Returns the id.
+     *
+     * @return Id of this item.
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * Returns the original work.
+     *
+     * @return Original work.
+     */
+    public Work getOriginalWork() {
+        return originalWork;
+    }
+
+    /**
+     * Returns the work result if the work completed.
+     *
+     * @return Work.
+     * @throws WorkException If the work completed with an exception.
+     */
+    public Work getResult() throws WorkException {
+        return result;
+    }
+
+    /**
+     * Sets the result.
+     *
+     * @param result Result.
+     */
+    protected void setResult(final Work result) {
+        this.result = result;
+    }
+
+    /**
+     * Returns the exception if work completed with an exception.
+     *
+     * @return Work exception.
+     */
+    protected WorkException getException() {
+        return exception;
+    }
+
+    /**
+     * Sets the exception.
+     *
+     * @param exception Exception.
+     */
+    protected void setException(final WorkException exception) {
+        this.exception = exception;
+    }
+
+    /**
+     * Returns the work type based on whether the work was accepted,
started,
+     * rejected or completed.
+     *
+     * @return Work status.
+     */
+    public int getStatus() {
+        return status;
+    }
+
+    /**
+     * Sets the status.
+     *
+     * @param status Status.
+     */
+    protected void setStatus(final int status) {
+        this.status = status;
+    }
+
+    /**
+     * @see Object#hashCode()
+     */
+    public int hashCode() {
+        return id.hashCode();
+    }
+
+    /**
+     * Indicates whether some other object is "equal to" this one.
+     *
+     * @param obj Object to be compared.
+     * @return true if this object is the same as the obj argument; false
+     *         otherwise..
+     */
+    public boolean equals(final Object obj) {
+        return (obj != null) && (obj.getClass() == DefaultWorkItem.class)
&& ((DefaultWorkItem) obj).id.equals(id);
+    }
+
+    /**
+     * Compares this object with the specified object for order. Returns a
+     * negative integer, zero, or a positive integer as this object is less
+     * than, equal to, or greater than the specified object.
+     *
+     * @param o Object to be compared.
+     * @return A negative integer, zero, or a positive integer as this
object
+     *         is less than, equal to, or greater than the specified
object.
+     * @throws ClassCastException needs better documentation.
+     */
+    public int compareTo(final Object o) {
+        if (o.getClass() != DefaultWorkItem.class) {
+            throw new ClassCastException(o.getClass().getName());
+        } else {
+            return ((DefaultWorkItem) o).getId().compareTo(getId());
+        }
+    }
+}

Propchange:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java?view=auto&rev=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java
(added)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java
Sat Apr 14 22:53:09 2007
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.core.work;
+
+/*
+ * JCA work wrapper.
+ */
+public class Jsr237Work<T extends Runnable> implements commonj.work.Work {
+
+    // Work that is being executed.
+    private T work;
+
+    /*
+     * Initializes the work instance.
+     */
+    public Jsr237Work(T work) {
+        this.work = work;
+    }
+
+    /*
+     * Returns the completed work.
+     */
+    public T getWork() {
+        return work;
+    }
+
+    /*
+     * Release the work.
+     */
+    public void release() {
+    }
+
+    /*
+     * Work attributes are not daemon.
+     */
+    public boolean isDaemon() {
+        return false;
+    }
+
+    /*
+     * Runs the work.
+     */
+    public void run() {
+        work.run();
+    }
+}
\ No newline at end of file

Propchange:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java?view=auto&rev=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java
(added)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java
Sat Apr 14 22:53:09 2007
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+import org.apache.tuscany.spi.services.work.NotificationListener;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.services.work.WorkSchedulerException;
+
+import commonj.work.WorkEvent;
+import commonj.work.WorkException;
+import commonj.work.WorkListener;
+import commonj.work.WorkManager;
+import commonj.work.WorkRejectedException;
+
+/**
+ * A work scheduler implementation based on a JSR 237 work manager.
+ * <p/>
+ * <p/>
+ * This needs a JSR 237 work manager implementation available for
scheduling work. Instances can be configured with a
+ * work manager implementation that is injected in. It is the
responsibility of the runtime environment to make a work
+ * manager implementaion available. For example, if the managed environment
supports work manager the runtime can use
+ * the appropriate lookup mechanism to inject the work manager
implementation. </p>
+ */
+public class Jsr237WorkScheduler implements WorkScheduler {
+
+    /**
+     * Underlying JSR-237 work manager
+     */
+    private WorkManager jsr237WorkManager;
+
+    /**
+     * Initializes the JSR 237 work manager.
+     *
+     * @param jsr237WorkManager JSR 237 work manager.
+     */
+    public Jsr237WorkScheduler(WorkManager jsr237WorkManager) {
+        if (jsr237WorkManager == null) {
+            throw new IllegalArgumentException("Work manager cannot be
null");
+        }
+        this.jsr237WorkManager = jsr237WorkManager;
+    }
+
+    /**
+     * Schedules a unit of work for future execution. The notification
listener is used to register interest in
+     * callbacks regarding the status of the work.
+     *
+     * @param work The unit of work that needs to be asynchronously
executed.
+     */
+    public <T extends Runnable> void scheduleWork(T work) {
+        scheduleWork(work, null);
+    }
+
+    /**
+     * Schedules a unit of work for future execution. The notification
listener is used to register interest in
+     * callbacks regarding the status of the work.
+     *
+     * @param work     The unit of work that needs to be asynchronously
executed.
+     * @param listener Notification listener for callbacks.
+     */
+    public <T extends Runnable> void scheduleWork(T work,
NotificationListener<T> listener) {
+
+        if (work == null) {
+            throw new IllegalArgumentException("Work cannot be null");
+        }
+
+        Jsr237Work<T> jsr237Work = new Jsr237Work<T>(work);
+        try {
+            if (listener == null) {
+                jsr237WorkManager.schedule(jsr237Work);
+            } else {
+                Jsr237WorkListener<T> jsr237WorkListener = new
Jsr237WorkListener<T>(listener, work);
+                jsr237WorkManager.schedule(jsr237Work, jsr237WorkListener);
+            }
+        } catch (WorkRejectedException ex) {
+            if (listener != null) {
+                listener.workRejected(work);
+            } else {
+                throw new WorkSchedulerException(ex);
+            }
+        } catch (WorkException ex) {
+            throw new WorkSchedulerException(ex);
+        }
+
+    }
+
+    /*
+     * Worklistener for keeping track of work status callbacks.
+     *
+     */
+    private class Jsr237WorkListener<T extends Runnable> implements
WorkListener {
+
+        // Notification listener
+        private NotificationListener<T> listener;
+
+        // Work
+        private T work;
+
+        /*
+        * Initializes the notification listener.
+        */
+        public Jsr237WorkListener(NotificationListener<T> listener, T work)
{
+            this.listener = listener;
+            this.work = work;
+        }
+
+        /*
+         * Callback when the work is accepted.
+         */
+        public void workAccepted(WorkEvent workEvent) {
+            T work = getWork();
+            listener.workAccepted(work);
+        }
+
+        /*
+         * Callback when the work is rejected.
+         */
+        public void workRejected(WorkEvent workEvent) {
+            T work = getWork();
+            listener.workRejected(work);
+        }
+
+        /*
+         * Callback when the work is started.
+         */
+        public void workStarted(WorkEvent workEvent) {
+            T work = getWork();
+            listener.workStarted(work);
+        }
+
+        /*
+         * Callback when the work is completed.
+         */
+        public void workCompleted(WorkEvent workEvent) {
+            T work = getWork();
+            Exception exception = workEvent.getException();
+            if (exception != null) {
+                listener.workFailed(work, exception);
+            } else {
+                listener.workCompleted(work);
+            }
+        }
+
+        /*
+        * Gets the underlying work from the work event.
+        */
+        private T getWork() {
+            return work;
+        }
+
+    }
+}

Propchange:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java?view=auto&rev=528930
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java
(added)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java
Sat Apr 14 22:53:09 2007
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+import java.rmi.server.UID;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.osoa.sca.annotations.Destroy;
+import org.osoa.sca.annotations.Property;
+
+import commonj.work.Work;
+import commonj.work.WorkEvent;
+import commonj.work.WorkException;
+import commonj.work.WorkItem;
+import commonj.work.WorkListener;
+import commonj.work.WorkManager;
+import commonj.work.WorkRejectedException;
+
+/**
+ * A thread-pool based implementation for the JSR-237 work manager.
+ * <p/>
+ * <p/>
+ * This implementation supports only local work.
+ * <p/>
+ * TODO Elaborate the implementation. </p>
+ */
+public class ThreadPoolWorkManager implements WorkManager {
+
+    // Map of work items currently handled by the work manager
+    private Map<DefaultWorkItem, WorkListener> workItems = new
ConcurrentHashMap<DefaultWorkItem, WorkListener>();
+
+    // Thread-pool
+    private ExecutorService executor;
+
+    /**
+     * Initializes the thread-pool.
+     *
+     * @param threadPoolSize Thread-pool size.
+     */
+    public ThreadPoolWorkManager(@Property(name = "poolSize") int
threadPoolSize) {
+        executor = Executors.newFixedThreadPool(threadPoolSize);
+    }
+
+    /**
+     * Schedules a unit of work asynchronously.
+     *
+     * @param work Work that needs to be scheduled.
+     * @return Work Work item representing the asynchronous work
+     */
+    public WorkItem schedule(Work work) throws WorkException {
+        return schedule(work, null);
+    }
+
+    /**
+     * Schedules a unit of work asynchronously.
+     *
+     * @param work         Work that needs to be scheduled.
+     * @param workListener Work listener for callbacks.
+     * @return Work Work item representing the asynchronous work
+     */
+    public WorkItem schedule(Work work, WorkListener workListener) throws
WorkRejectedException {
+
+        DefaultWorkItem workItem = new DefaultWorkItem(new
UID().toString(), work);
+        if (workListener != null) {
+            workItems.put(workItem, workListener);
+        }
+        workAccepted(workItem, work);
+        if (scheduleWork(work, workItem)) {
+            return workItem;
+        } else {
+            workItem.setStatus(WorkEvent.WORK_REJECTED);
+            if (workListener != null) {
+                workListener.workRejected(new DefaultWorkEvent(workItem));
+            }
+            throw new WorkRejectedException("Unable to schedule work");
+        }
+    }
+
+    /**
+     * Wait for all the specified units of work to finish.
+     *
+     * @param works   Units of the work that need to finish.
+     * @param timeout Timeout for waiting for the units of work to finish.
+     */
+    public boolean waitForAll(Collection works, long timeout) {
+        throw new UnsupportedOperationException("waitForAll not
supported");
+    }
+
+    /**
+     * Wait for any of the specified units of work to finish.
+     *
+     * @param works   Units of the work that need to finish.
+     * @param timeout Timeout for waiting for the units of work to finish.
+     */
+    public Collection waitForAny(Collection works, long timeout) {
+        throw new UnsupportedOperationException("waitForAny not
supported");
+    }
+
+    /**
+     * Method provided for subclasses to indicate a work accptance.
+     *
+     * @param workItem Work item representing the work that was accepted.
+     * @param work     Work that was accepted.
+     */
+    private void workAccepted(final DefaultWorkItem workItem, final Work
work) {
+        WorkListener listener = workItems.get(workItem);
+        if (listener != null) {
+            workItem.setStatus(WorkEvent.WORK_ACCEPTED);
+            WorkEvent event = new DefaultWorkEvent(workItem);
+            listener.workAccepted(event);
+        }
+    }
+
+    /*
+     * Method to indicate a work start.
+     */
+    private void workStarted(final DefaultWorkItem workItem, final Work
work) {
+        WorkListener listener = workItems.get(workItem);
+        if (listener != null) {
+            workItem.setStatus(WorkEvent.WORK_STARTED);
+            WorkEvent event = new DefaultWorkEvent(workItem);
+            listener.workStarted(event);
+        }
+    }
+
+    /*
+     * Method to indicate a work completion.
+     */
+    private void workCompleted(final DefaultWorkItem workItem, final Work
work) {
+        workCompleted(workItem, work, null);
+    }
+
+    /*
+     * Method to indicate a work completion.
+     */
+    private void workCompleted(final DefaultWorkItem workItem, final Work
work, final WorkException exception) {
+        WorkListener listener = workItems.get(workItem);
+        if (listener != null) {
+            workItem.setStatus(WorkEvent.WORK_COMPLETED);
+            workItem.setResult(work);
+            workItem.setException(exception);
+            WorkEvent event = new DefaultWorkEvent(workItem);
+            listener.workCompleted(event);
+            workItems.remove(workItem);
+        }
+    }
+
+    /*
+     * Schedules the work using the threadpool.
+     */
+    private boolean scheduleWork(final Work work, final DefaultWorkItem
workItem) {
+        try {
+            executor.execute(new DecoratingWork(workItem, work));
+            return true;
+        } catch (RejectedExecutionException ex) {
+            return false;
+        }
+    }
+
+    /*
+     * Class that decorates the original worker so that it can get
callbacks when work is done.
+     */
+    private final class DecoratingWork implements Runnable {
+
+        // Work item for this work.
+        private DefaultWorkItem workItem;
+
+        // The original work.
+        private Work decoratedWork;
+
+        /*
+         * Initializes the work item and underlying work.
+         */
+        private DecoratingWork(final DefaultWorkItem workItem, final Work
decoratedWork) {
+            this.workItem = workItem;
+            this.decoratedWork = decoratedWork;
+        }
+
+        /*
+         * Overrides the run method.
+         */
+        public void run() {
+            workStarted(workItem, decoratedWork);
+            try {
+                decoratedWork.run();
+                workCompleted(workItem, decoratedWork);
+            } catch (Throwable th) {
+                workCompleted(workItem, decoratedWork, new WorkException(
th.getMessage(), th));
+            }
+        }
+
+    }
+
+    @Destroy
+    public void destroy() {
+        executor.shutdown();
+    }
+
+}

Propchange:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: tuscany-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: tuscany-commits-help@ws.apache.org



-- 
Luciano Resende
http://people.apache.org/~lresende

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message