deltaspike-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gpetra...@apache.org
Subject [2/3] deltaspike git commit: DELTASPIKE-1118 refactoring to ThrottledStrategy
Date Mon, 11 Apr 2016 23:02:32 GMT
DELTASPIKE-1118 refactoring to ThrottledStrategy


Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo
Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/18e1af70
Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/18e1af70
Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/18e1af70

Branch: refs/heads/master
Commit: 18e1af703e06e629caadcc949bad0fd566b3d24c
Parents: ac3d582
Author: gpetracek <gpetracek@apache.org>
Authored: Tue Apr 12 00:11:53 2016 +0200
Committer: gpetracek <gpetracek@apache.org>
Committed: Tue Apr 12 01:02:11 2016 +0200

----------------------------------------------------------------------
 .../core/spi/throttling/ThrottledStrategy.java  |  25 ++++
 .../throttling/DefaultThrottledStrategy.java    |  38 +++++
 .../core/impl/throttling/Invoker.java           |  83 ++++++++++
 .../core/impl/throttling/InvokerStorage.java    | 111 ++++++++++++++
 .../impl/throttling/ThrottledInterceptor.java   | 150 +------------------
 5 files changed, 260 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java
new file mode 100644
index 0000000..e1a290a
--- /dev/null
+++ b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/spi/throttling/ThrottledStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.spi.throttling;
+
+import org.apache.deltaspike.core.spi.InterceptorStrategy;
+
+public interface ThrottledStrategy extends InterceptorStrategy
+{
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java
new file mode 100644
index 0000000..e31241f
--- /dev/null
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/DefaultThrottledStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.throttling;
+
+import org.apache.deltaspike.core.spi.throttling.ThrottledStrategy;
+
+import javax.enterprise.context.Dependent;
+import javax.inject.Inject;
+import javax.interceptor.InvocationContext;
+
+@Dependent
+public class DefaultThrottledStrategy implements ThrottledStrategy
+{
+    @Inject
+    private InvokerStorage metadata;
+
+    @Override
+    public Object execute(InvocationContext ic) throws Exception
+    {
+        return metadata.getOrCreateInvoker(ic).invoke(ic);
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java
new file mode 100644
index 0000000..0d4e49f
--- /dev/null
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/Invoker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.throttling;
+
+import org.apache.deltaspike.core.util.ExceptionUtils;
+
+import javax.interceptor.InvocationContext;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+class Invoker
+{
+    private final int weight;
+    private final Semaphore semaphore;
+    private final long timeout;
+
+    Invoker(final Semaphore semaphore, final int weight, final long timeout)
+    {
+        this.semaphore = semaphore;
+        this.weight = weight;
+        this.timeout = timeout;
+    }
+
+    public Object invoke(final InvocationContext context) throws Exception
+    {
+        if (timeout > 0)
+        {
+            try
+            {
+                if (!semaphore.tryAcquire(weight, timeout, TimeUnit.MILLISECONDS))
+                {
+                    throw new IllegalStateException(
+                        "Can't acquire " + weight + " permits for " + context.getMethod()
+ " in " + timeout + "ms");
+                }
+            }
+            catch (final InterruptedException e)
+            {
+                return onInterruption(e);
+            }
+        }
+        else
+        {
+            try
+            {
+                semaphore.acquire(weight);
+            }
+            catch (final InterruptedException e)
+            {
+                return onInterruption(e);
+            }
+        }
+        try
+        {
+            return context.proceed();
+        }
+        finally
+        {
+            semaphore.release(weight);
+        }
+    }
+
+    private static Semaphore onInterruption(final InterruptedException e)
+    {
+        Thread.interrupted();
+        throw ExceptionUtils.throwAsRuntimeException(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java
new file mode 100644
index 0000000..3f1d69d
--- /dev/null
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/InvokerStorage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.throttling;
+
+
+import org.apache.deltaspike.core.api.throttling.Throttled;
+import org.apache.deltaspike.core.api.throttling.Throttling;
+import org.apache.deltaspike.core.impl.util.AnnotatedMethods;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Typed;
+import javax.enterprise.inject.spi.AnnotatedMethod;
+import javax.enterprise.inject.spi.AnnotatedType;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.inject.Inject;
+import javax.interceptor.InvocationContext;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+
+@ApplicationScoped
+@Typed(InvokerStorage.class)
+public class InvokerStorage implements Throttling.SemaphoreFactory
+{
+    private final ConcurrentMap<String, Semaphore> semaphores = new ConcurrentHashMap<String,
Semaphore>();
+    private final ConcurrentMap<Method, Invoker> providers = new ConcurrentHashMap<Method,
Invoker>();
+
+    @Inject
+    private BeanManager beanManager;
+
+    Invoker getOrCreateInvoker(final InvocationContext ic)
+    {
+        final Method method = ic.getMethod();
+        Invoker i = providers.get(method);
+        if (i == null)
+        {
+            final Class declaringClass = method.getDeclaringClass();
+            final AnnotatedType<Object> annotatedType = beanManager.createAnnotatedType(declaringClass);
+            final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType,
method);
+
+            Throttled config = annotatedMethod.getAnnotation(Throttled.class);
+            if (config == null)
+            {
+                config = annotatedType.getAnnotation(Throttled.class);
+            }
+            Throttling sharedConfig = annotatedMethod.getAnnotation(Throttling.class);
+            if (sharedConfig == null)
+            {
+                sharedConfig = annotatedType.getAnnotation(Throttling.class);
+            }
+
+            final Throttling.SemaphoreFactory factory =
+                    sharedConfig != null && sharedConfig.factory() != Throttling.SemaphoreFactory.class
?
+                            Throttling.SemaphoreFactory.class.cast(
+                                    beanManager.getReference(beanManager.resolve(
+                                            beanManager.getBeans(
+                                                    sharedConfig.factory())),
+                                            Throttling.SemaphoreFactory.class, null)) : this;
+
+            final Semaphore semaphore = factory.newSemaphore(
+                    annotatedMethod,
+                    sharedConfig != null && !sharedConfig.name().isEmpty() ?
+                            sharedConfig.name() : declaringClass.getName(),
+                    sharedConfig != null && sharedConfig.fair(),
+                    sharedConfig != null ? sharedConfig.permits() : 1);
+            final long timeout = config.timeoutUnit().toMillis(config.timeout());
+            final int weigth = config.weight();
+            i = new Invoker(semaphore, weigth, timeout);
+            final Invoker existing = providers.putIfAbsent(ic.getMethod(), i);
+            if (existing != null)
+            {
+                i = existing;
+            }
+        }
+        return i;
+    }
+
+    @Override
+    public Semaphore newSemaphore(final AnnotatedMethod<?> method, final String name,
+                                  final boolean fair, final int permits)
+    {
+        Semaphore semaphore = semaphores.get(name);
+        if (semaphore == null)
+        {
+            semaphore = new Semaphore(permits, fair);
+            final Semaphore existing = semaphores.putIfAbsent(name, semaphore);
+            if (existing != null)
+            {
+                semaphore = existing;
+            }
+        }
+        return semaphore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/18e1af70/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
index f02b14f..19ddd5c 100644
--- a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
+++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
@@ -19,168 +19,24 @@
 package org.apache.deltaspike.core.impl.throttling;
 
 import org.apache.deltaspike.core.api.throttling.Throttled;
-import org.apache.deltaspike.core.api.throttling.Throttling;
-import org.apache.deltaspike.core.impl.util.AnnotatedMethods;
+import org.apache.deltaspike.core.spi.throttling.ThrottledStrategy;
 
-import javax.enterprise.context.ApplicationScoped;
-import javax.enterprise.inject.Typed;
-import javax.enterprise.inject.spi.AnnotatedMethod;
-import javax.enterprise.inject.spi.AnnotatedType;
-import javax.enterprise.inject.spi.BeanManager;
 import javax.inject.Inject;
 import javax.interceptor.AroundInvoke;
 import javax.interceptor.Interceptor;
 import javax.interceptor.InvocationContext;
 import java.io.Serializable;
-import java.lang.reflect.Method;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 @Throttled
 @Interceptor
 public class ThrottledInterceptor implements Serializable
 {
     @Inject
-    private LocalCache metadata;
+    private ThrottledStrategy throttledStrategy;
 
     @AroundInvoke
     public Object invoke(final InvocationContext ic) throws Exception
     {
-        return metadata.getOrCreateInvocation(ic).invoke(ic);
-    }
-
-    private static Semaphore onInterruption(final InterruptedException e)
-    {
-        Thread.interrupted();
-        throw new IllegalStateException("acquire() interrupted", e);
-    }
-
-    @ApplicationScoped
-    @Typed(LocalCache.class)
-    static class LocalCache implements Throttling.SemaphoreFactory
-    {
-        private final ConcurrentMap<String, Semaphore> semaphores = new ConcurrentHashMap<String,
Semaphore>();
-        private final ConcurrentMap<Method, Invocation> providers = new ConcurrentHashMap<Method,
Invocation>();
-
-        @Inject
-        private BeanManager beanManager;
-
-        Invocation getOrCreateInvocation(final InvocationContext ic)
-        {
-            final Method method = ic.getMethod();
-            Invocation i = providers.get(method);
-            if (i == null)
-            {
-                final Class declaringClass = method.getDeclaringClass();
-                final AnnotatedType<Object> annotatedType = beanManager.createAnnotatedType(declaringClass);
-                final AnnotatedMethod<?> annotatedMethod = AnnotatedMethods.findMethod(annotatedType,
method);
-
-                Throttled config = annotatedMethod.getAnnotation(Throttled.class);
-                if (config == null)
-                {
-                    config = annotatedType.getAnnotation(Throttled.class);
-                }
-                Throttling sharedConfig = annotatedMethod.getAnnotation(Throttling.class);
-                if (sharedConfig == null)
-                {
-                    sharedConfig = annotatedType.getAnnotation(Throttling.class);
-                }
-
-                final Throttling.SemaphoreFactory factory =
-                        sharedConfig != null && sharedConfig.factory() != Throttling.SemaphoreFactory.class
?
-                        Throttling.SemaphoreFactory.class.cast(
-                                beanManager.getReference(beanManager.resolve(
-                                        beanManager.getBeans(
-                                                sharedConfig.factory())),
-                                        Throttling.SemaphoreFactory.class, null)) : this;
-
-                final Semaphore semaphore = factory.newSemaphore(
-                        annotatedMethod,
-                        sharedConfig != null && !sharedConfig.name().isEmpty() ?
-                                sharedConfig.name() : declaringClass.getName(),
-                        sharedConfig != null && sharedConfig.fair(),
-                        sharedConfig != null ? sharedConfig.permits() : 1);
-                final long timeout = config.timeoutUnit().toMillis(config.timeout());
-                final int weigth = config.weight();
-                i = new Invocation(semaphore, weigth, timeout);
-                final Invocation existing = providers.putIfAbsent(ic.getMethod(), i);
-                if (existing != null)
-                {
-                    i = existing;
-                }
-            }
-            return i;
-        }
-
-        @Override
-        public Semaphore newSemaphore(final AnnotatedMethod<?> method, final String
name,
-                                      final boolean fair, final int permits)
-        {
-            Semaphore semaphore = semaphores.get(name);
-            if (semaphore == null)
-            {
-                semaphore = new Semaphore(permits, fair);
-                final Semaphore existing = semaphores.putIfAbsent(name, semaphore);
-                if (existing != null)
-                {
-                    semaphore = existing;
-                }
-            }
-            return semaphore;
-        }
-    }
-
-    private static final class Invocation
-    {
-        private final int weight;
-        private final Semaphore semaphore;
-        private final long timeout;
-
-        private Invocation(final Semaphore semaphore, final int weight, final long timeout)
-        {
-            this.semaphore = semaphore;
-            this.weight = weight;
-            this.timeout = timeout;
-        }
-
-        Object invoke(final InvocationContext context) throws Exception
-        {
-            if (timeout > 0)
-            {
-                try
-                {
-                    if (!semaphore.tryAcquire(weight, timeout, TimeUnit.MILLISECONDS))
-                    {
-                        throw new IllegalStateException("Can't acquire " + weight +
-                                " permits for " + context.getMethod() + " in " + timeout
+ "ms");
-                    }
-                }
-                catch (final InterruptedException e)
-                {
-                    return onInterruption(e);
-                }
-            }
-            else
-            {
-                try
-                {
-                    semaphore.acquire(weight);
-                }
-                catch (final InterruptedException e)
-                {
-                    return onInterruption(e);
-                }
-            }
-            try
-            {
-                return context.proceed();
-            }
-            finally
-            {
-                semaphore.release(weight);
-            }
-        }
+        return throttledStrategy.execute(ic);
     }
 }


Mime
View raw message