deltaspike-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gpetra...@apache.org
Subject [1/3] deltaspike git commit: DELTASPIKE-1119 refactoring to FutureableStrategy
Date Mon, 11 Apr 2016 23:02:31 GMT
Repository: deltaspike
Updated Branches:
  refs/heads/master ac3d5829c -> 963190661


DELTASPIKE-1119 refactoring to FutureableStrategy


Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo
Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/943cf54a
Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/943cf54a
Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/943cf54a

Branch: refs/heads/master
Commit: 943cf54a6838573ebf4ddf90988c12842a04b47e
Parents: 18e1af7
Author: gpetracek <gpetracek@apache.org>
Authored: Tue Apr 12 00:24:58 2016 +0200
Committer: gpetracek <gpetracek@apache.org>
Committed: Tue Apr 12 01:02:11 2016 +0200

----------------------------------------------------------------------
 .../core/spi/future/FutureableStrategy.java     |  25 ++
 .../impl/future/DefaultFutureableStrategy.java  | 155 ++++++++++
 .../core/impl/future/FutureableInterceptor.java | 280 +------------------
 .../impl/future/J8PromiseCompanionTask.java     |  98 +++++++
 .../core/impl/future/ThreadPoolManager.java     | 101 +++++++
 5 files changed, 383 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/deltaspike/blob/943cf54a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/future/FutureableStrategy.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/future/FutureableStrategy.java
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/future/FutureableStrategy.java
new file mode 100644
index 0000000..4f2214f
--- /dev/null
+++ b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/future/FutureableStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.spi.future;
+
+import org.apache.deltaspike.core.spi.InterceptorStrategy;
+
+public interface FutureableStrategy extends InterceptorStrategy
+{
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/943cf54a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/DefaultFutureableStrategy.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/DefaultFutureableStrategy.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/DefaultFutureableStrategy.java
new file mode 100644
index 0000000..7a51cc4
--- /dev/null
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/DefaultFutureableStrategy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.future;
+
+import org.apache.deltaspike.core.api.future.Futureable;
+import org.apache.deltaspike.core.impl.util.AnnotatedMethods;
+import org.apache.deltaspike.core.spi.future.FutureableStrategy;
+import org.apache.deltaspike.core.util.ExceptionUtils;
+
+import javax.enterprise.context.Dependent;
+import javax.enterprise.inject.spi.AnnotatedMethod;
+import javax.enterprise.inject.spi.AnnotatedType;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.inject.Inject;
+import javax.interceptor.InvocationContext;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+@Dependent
+public class DefaultFutureableStrategy implements FutureableStrategy
+{
+    private static final Class<?> COMPLETION_STAGE;
+    private static final Class<?> COMPLETABLE_FUTURE;
+    private static final Method COMPLETABLE_STAGE_TO_FUTURE;
+
+    static
+    {
+        Class<?> completionStageClass = null;
+        Class<?> completableFutureClass = null;
+        Method completionStageClassToCompletableFuture = null;
+        try
+        {
+            final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+            completionStageClass = classLoader.loadClass("java.util.concurrent.CompletionStage");
+            completionStageClassToCompletableFuture = completionStageClass.getMethod("toCompletableFuture");
+            completableFutureClass = classLoader.loadClass("java.util.concurrent.CompletableFuture");
+        }
+        catch (final Exception e)
+        {
+            // not on java 8
+        }
+        COMPLETION_STAGE = completionStageClass;
+        COMPLETABLE_FUTURE = completableFutureClass;
+        COMPLETABLE_STAGE_TO_FUTURE = completionStageClassToCompletableFuture;
+    }
+
+    @Inject
+    private ThreadPoolManager manager;
+
+    @Inject
+    private BeanManager beanManager;
+
+    private transient ConcurrentMap<Method, ExecutorService> configByMethod =
+        new ConcurrentHashMap<Method, ExecutorService>();
+
+
+    @Override
+    public Object execute(final InvocationContext ic) throws Exception
+    {
+        // validate usage
+        final Class<?> returnType = ic.getMethod().getReturnType();
+        if (!Future.class.isAssignableFrom(returnType) &&
+                (COMPLETION_STAGE == null || !COMPLETION_STAGE.isAssignableFrom(returnType)))
+        {
+            throw new IllegalArgumentException("Return type should be a CompletableStage
or Future");
+        }
+
+        if (configByMethod == null)
+        {
+            synchronized (this)
+            {
+                if (configByMethod == null)
+                {
+                    configByMethod = new ConcurrentHashMap<Method, ExecutorService>();
+                }
+            }
+        }
+
+        // running < j8 we cant have cancellation
+        //final AtomicReference<Callable<?>> cancelHook = new AtomicReference<Callable<?>>();
+        final Callable<Object> invocation = new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                try
+                {
+                    final Object proceed = ic.proceed();
+                    final Future<?> future = COMPLETION_STAGE == null || !COMPLETION_STAGE.isInstance(proceed)
?
+                            Future.class.cast(proceed) :
+                            Future.class.cast(COMPLETABLE_STAGE_TO_FUTURE.invoke(proceed));
+                    return future.get();
+                }
+                catch (final InvocationTargetException e)
+                {
+                    throw ExceptionUtils.throwAsRuntimeException(e.getCause());
+                }
+                catch (final Exception e)
+                {
+                    throw ExceptionUtils.throwAsRuntimeException(e);
+                }
+            }
+        };
+
+        final ExecutorService pool = getOrCreatePool(ic);
+        if (COMPLETABLE_FUTURE == null)  // not on java 8 can only be a future
+        {
+            return pool.submit(invocation);
+        }
+
+        // java 8, use CompletableFuture, it impl CompletionStage and Future so everyone
is happy
+        final Object completableFuture = COMPLETABLE_FUTURE.newInstance();
+        pool.submit(new J8PromiseCompanionTask(completableFuture, invocation));
+        // TODO: handle cancel
+        return completableFuture;
+    }
+
+    protected ExecutorService getOrCreatePool(final InvocationContext ic)
+    {
+        final Method method = ic.getMethod();
+        ExecutorService executorService = configByMethod.get(method);
+        if (executorService == null)
+        {
+            final AnnotatedType<?> annotatedType = beanManager.createAnnotatedType(method.getDeclaringClass());
+            final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType,
method);
+            final Futureable methodConfig = annotatedMethod.getAnnotation(Futureable.class);
+            final ExecutorService instance = manager.find(
+                    (methodConfig == null ? annotatedType.getAnnotation(Futureable.class)
: methodConfig).value());
+            configByMethod.putIfAbsent(method, instance);
+            executorService = instance;
+        }
+        return executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/943cf54a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
index 6aec388..ff39fb7 100644
--- a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
@@ -18,297 +18,25 @@
  */
 package org.apache.deltaspike.core.impl.future;
 
-import org.apache.deltaspike.core.api.config.ConfigResolver;
 import org.apache.deltaspike.core.api.future.Futureable;
-import org.apache.deltaspike.core.impl.util.AnnotatedMethods;
+import org.apache.deltaspike.core.spi.future.FutureableStrategy;
 
-import javax.annotation.PreDestroy;
-import javax.enterprise.context.ApplicationScoped;
-import javax.enterprise.inject.spi.AnnotatedMethod;
-import javax.enterprise.inject.spi.AnnotatedType;
-import javax.enterprise.inject.spi.BeanManager;
 import javax.inject.Inject;
 import javax.interceptor.AroundInvoke;
 import javax.interceptor.Interceptor;
 import javax.interceptor.InvocationContext;
 import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 @Interceptor
-@Futureable("")
+@Futureable
 public class FutureableInterceptor implements Serializable
 {
-    private static final Class<?> COMPLETION_STAGE;
-    private static final Class<?> COMPLETABLE_FUTURE;
-    private static final Method COMPLETABLE_STAGE_TO_FUTURE;
-    private static final Method COMPLETABLE_FUTURE_COMPLETE;
-    private static final Method COMPLETABLE_FUTURE_COMPLETE_ERROR;
-
-    static
-    {
-        Class<?> completionStageClass = null;
-        Class<?> completableFutureClass = null;
-        Method completionStageClassToCompletableFuture = null;
-        Method completableFutureComplete = null;
-        Method completableFutureCompleteError = null;
-        try
-        {
-            final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
-            completionStageClass = classLoader.loadClass("java.util.concurrent.CompletionStage");
-            completionStageClassToCompletableFuture = completionStageClass.getMethod("toCompletableFuture");
-            completableFutureClass = classLoader.loadClass("java.util.concurrent.CompletableFuture");
-            completableFutureComplete = completableFutureClass.getMethod("complete", Object.class);
-            completableFutureCompleteError = completableFutureClass.getMethod("completeExceptionally",
Throwable.class);
-        }
-        catch (final Exception e)
-        {
-            // not on java 8
-        }
-        COMPLETION_STAGE = completionStageClass;
-        COMPLETABLE_FUTURE = completableFutureClass;
-        COMPLETABLE_STAGE_TO_FUTURE = completionStageClassToCompletableFuture;
-        COMPLETABLE_FUTURE_COMPLETE = completableFutureComplete;
-        COMPLETABLE_FUTURE_COMPLETE_ERROR = completableFutureCompleteError;
-    }
-
     @Inject
-    private ThreadPoolManager manager;
-
-    @Inject
-    private BeanManager beanManager;
-
-    private transient ConcurrentMap<Method, ExecutorService> configByMethod =
-            new ConcurrentHashMap<Method, ExecutorService>();
+    private FutureableStrategy futureableStrategy;
 
     @AroundInvoke
     public Object invoke(final InvocationContext ic) throws Exception
     {
-        // validate usage
-        final Class<?> returnType = ic.getMethod().getReturnType();
-        if (!Future.class.isAssignableFrom(returnType) &&
-                (COMPLETION_STAGE == null || !COMPLETION_STAGE.isAssignableFrom(returnType)))
-        {
-            throw new IllegalArgumentException("Return type should be a CompletableStage
or Future");
-        }
-
-        if (configByMethod == null)
-        {
-            synchronized (this)
-            {
-                if (configByMethod == null)
-                {
-                    configByMethod = new ConcurrentHashMap<Method, ExecutorService>();
-                }
-            }
-        }
-
-        // running < j8 we cant have cancellation
-        //final AtomicReference<Callable<?>> cancelHook = new AtomicReference<Callable<?>>();
-        final Callable<Object> invocation = new Callable<Object>()
-        {
-            @Override
-            public Object call() throws Exception
-            {
-                try
-                {
-                    final Object proceed = ic.proceed();
-                    final Future<?> future = COMPLETION_STAGE == null || !COMPLETION_STAGE.isInstance(proceed)
?
-                            Future.class.cast(proceed) :
-                            Future.class.cast(COMPLETABLE_STAGE_TO_FUTURE.invoke(proceed));
-                    /*
-                    cancelHook.set(new Callable<Boolean>()
-                    {
-                        @Override
-                        public Boolean call()
-                        {
-                            return future.cancel(true);
-                        }
-                    });
-                    */
-                    return future.get();
-                }
-                catch (final InvocationTargetException e)
-                {
-                    throw rethrow(e.getCause());
-                }
-                catch (final Exception e)
-                {
-                    throw rethrow(e);
-                }
-            }
-        };
-
-        final ExecutorService pool = getOrCreatePool(ic);
-        if (COMPLETABLE_FUTURE == null)  // not on java 8 can only be a future
-        {
-            return pool.submit(invocation);
-        }
-
-        // java 8, use CompletableFuture, it impl CompletionStage and Future so everyone
is happy
-        final Object completableFuture = COMPLETABLE_FUTURE.newInstance();
-        pool.submit(new J8PromiseCompanionTask(completableFuture, invocation));
-        // TODO: handle cancel
-        return completableFuture;
-    }
-
-    private RuntimeException rethrow(final Throwable cause)
-    {
-        if (RuntimeException.class.isInstance(cause))
-        {
-            return RuntimeException.class.cast(cause);
-        }
-        return new IllegalStateException(cause);
-    }
-
-    private ExecutorService getOrCreatePool(final InvocationContext ic)
-    {
-        final Method method = ic.getMethod();
-        ExecutorService executorService = configByMethod.get(method);
-        if (executorService == null)
-        {
-            final AnnotatedType<?> annotatedType = beanManager.createAnnotatedType(method.getDeclaringClass());
-            final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType,
method);
-            final Futureable methodConfig = annotatedMethod.getAnnotation(Futureable.class);
-            final ExecutorService instance = manager.find(
-                    (methodConfig == null ? annotatedType.getAnnotation(Futureable.class)
: methodConfig).value());
-            configByMethod.putIfAbsent(method, instance);
-            executorService = instance;
-        }
-        return executorService;
-    }
-
-    @ApplicationScoped
-    public static class ThreadPoolManager
-    {
-        private final ConcurrentMap<String, ExecutorService> pools = new ConcurrentHashMap<String,
ExecutorService>();
-        private volatile ExecutorService defaultPool;
-        private volatile boolean closed = false;
-
-        @PreDestroy
-        private void shutdown()
-        {
-            closed = true;
-            final String propertyValue = ConfigResolver.getPropertyValue("deltaspike.future.timeout");
-            final long timeout = propertyValue == null ? TimeUnit.MINUTES.toMillis(1) : Integer.parseInt(propertyValue);
-            for (final ExecutorService es : pools.values())
-            {
-                es.shutdown();
-                try
-                {
-                    es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
-                }
-                catch (final InterruptedException e)
-                {
-                    Thread.interrupted();
-                }
-            }
-            if (defaultPool != null)
-            {
-                defaultPool.shutdown();
-                try
-                {
-                    defaultPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
-                }
-                catch (final InterruptedException e)
-                {
-                    Thread.interrupted();
-                }
-            }
-            pools.clear();
-        }
-
-        // open door for users until we have a config, should be part of API but since it
can change keeping it there
-        public void register(final String name, final ExecutorService es)
-        {
-            pools.putIfAbsent(name, es);
-        }
-
-        public ExecutorService find(final String name)
-        {
-            if (closed)
-            {
-                throw new IllegalStateException("Container is shutting down");
-            }
-            ExecutorService pool = pools.get(name);
-            if (pool == null)
-            {
-                ensureDefaultPool();
-                pool = defaultPool;
-            }
-            return pool;
-        }
-
-        private void ensureDefaultPool()
-        {
-            if (defaultPool == null)
-            {
-                synchronized (this)
-                {
-                    if (defaultPool == null)
-                    {
-                        defaultPool = Executors.newFixedThreadPool(
-                                Math.max(2, Runtime.getRuntime().availableProcessors()));
-                    }
-                }
-            }
-        }
-    }
-
-    private static final class J8PromiseCompanionTask<T> implements Runnable
-    {
-        private Object dep;
-        private Callable<T> fn;
-
-        J8PromiseCompanionTask(final Object dep, Callable<T> fn)
-        {
-            this.dep = dep;
-            this.fn = fn;
-        }
-
-        public void run()
-        {
-            try
-            {
-                COMPLETABLE_FUTURE_COMPLETE.invoke(dep, fn.call());
-            }
-            catch (final InvocationTargetException e)
-            {
-                try
-                {
-                    COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, e.getCause());
-                }
-                catch (IllegalAccessException e1)
-                {
-                    throw new IllegalStateException(e1);
-                }
-                catch (final InvocationTargetException e1)
-                {
-                    throw new IllegalStateException(e1.getCause());
-                }
-            }
-            catch (Exception e)
-            {
-                try
-                {
-                    COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, e);
-                }
-                catch (IllegalAccessException e1)
-                {
-                    throw new IllegalStateException(e1);
-                }
-                catch (final InvocationTargetException e1)
-                {
-                    throw new IllegalStateException(e1.getCause());
-                }
-            }
-        }
+        return futureableStrategy.execute(ic);
     }
 }

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/943cf54a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/J8PromiseCompanionTask.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/J8PromiseCompanionTask.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/J8PromiseCompanionTask.java
new file mode 100644
index 0000000..c4bb49d
--- /dev/null
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/J8PromiseCompanionTask.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.future;
+
+import org.apache.deltaspike.core.util.ExceptionUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+
+class J8PromiseCompanionTask<T> implements Runnable
+{
+    private static final Method COMPLETABLE_FUTURE_COMPLETE;
+    private static final Method COMPLETABLE_FUTURE_COMPLETE_ERROR;
+
+    static
+    {
+        Class<?> completableFutureClass = null;
+        Method completableFutureComplete = null;
+        Method completableFutureCompleteError = null;
+        try
+        {
+            final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+            completableFutureClass = classLoader.loadClass("java.util.concurrent.CompletableFuture");
+            completableFutureComplete = completableFutureClass.getMethod("complete", Object.class);
+            completableFutureCompleteError = completableFutureClass.getMethod("completeExceptionally",
Throwable.class);
+        }
+        catch (final Exception e)
+        {
+            // not on java 8
+        }
+        COMPLETABLE_FUTURE_COMPLETE = completableFutureComplete;
+        COMPLETABLE_FUTURE_COMPLETE_ERROR = completableFutureCompleteError;
+    }
+
+    private Object dep;
+    private Callable<T> fn;
+
+    J8PromiseCompanionTask(final Object dep, Callable<T> fn)
+    {
+        this.dep = dep;
+        this.fn = fn;
+    }
+
+    public void run()
+    {
+        try
+        {
+            COMPLETABLE_FUTURE_COMPLETE.invoke(dep, fn.call());
+        }
+        catch (final InvocationTargetException e)
+        {
+            try
+            {
+                COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, e.getCause());
+            }
+            catch (IllegalAccessException e1)
+            {
+                throw ExceptionUtils.throwAsRuntimeException(e1);
+            }
+            catch (final InvocationTargetException e1)
+            {
+                throw ExceptionUtils.throwAsRuntimeException(e1.getCause());
+            }
+        }
+        catch (Exception e)
+        {
+            try
+            {
+                COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, e);
+            }
+            catch (IllegalAccessException e1)
+            {
+                throw ExceptionUtils.throwAsRuntimeException(e1);
+            }
+            catch (final InvocationTargetException e1)
+            {
+                throw ExceptionUtils.throwAsRuntimeException(e1.getCause());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/943cf54a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
new file mode 100644
index 0000000..e1711a0
--- /dev/null
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.future;
+
+import org.apache.deltaspike.core.api.config.ConfigResolver;
+
+import javax.annotation.PreDestroy;
+import javax.enterprise.context.ApplicationScoped;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@ApplicationScoped
+public class ThreadPoolManager
+{
+    private final ConcurrentMap<String, ExecutorService> pools = new ConcurrentHashMap<String,
ExecutorService>();
+    private volatile ExecutorService defaultPool;
+    private volatile boolean closed = false;
+
+    @PreDestroy
+    private void shutdown()
+    {
+        closed = true;
+        final String propertyValue = ConfigResolver.getPropertyValue("deltaspike.future.timeout");
+        final long timeout = propertyValue == null ? TimeUnit.MINUTES.toMillis(1) : Integer.parseInt(propertyValue);
+        for (final ExecutorService es : pools.values())
+        {
+            es.shutdown();
+            try
+            {
+                es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+            }
+            catch (final InterruptedException e)
+            {
+                Thread.interrupted();
+            }
+        }
+        if (defaultPool != null)
+        {
+            defaultPool.shutdown();
+            try
+            {
+                defaultPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+            }
+            catch (final InterruptedException e)
+            {
+                Thread.interrupted();
+            }
+        }
+        pools.clear();
+    }
+
+    public ExecutorService find(final String name)
+    {
+        if (closed)
+        {
+            throw new IllegalStateException("Container is shutting down");
+        }
+        ExecutorService pool = pools.get(name);
+        if (pool == null)
+        {
+            ensureDefaultPool();
+            pool = defaultPool;
+        }
+        return pool;
+    }
+
+    private void ensureDefaultPool()
+    {
+        if (defaultPool == null)
+        {
+            synchronized (this)
+            {
+                if (defaultPool == null)
+                {
+                    defaultPool = Executors.newFixedThreadPool(
+                            Math.max(2, Runtime.getRuntime().availableProcessors()));
+                }
+            }
+        }
+    }
+}
+


Mime
View raw message