openwebbeans-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmannibu...@apache.org
Subject svn commit: r1797230 - in /openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event: NotificationManager.java ObserverMethodImpl.java
Date Thu, 01 Jun 2017 15:08:20 GMT
Author: rmannibucau
Date: Thu Jun  1 15:08:20 2017
New Revision: 1797230

URL: http://svn.apache.org/viewvc?rev=1797230&view=rev
Log:
some CompletionFuture work to share with Mark work on async events

Modified:
    openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java
    openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/ObserverMethodImpl.java

Modified: openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java
URL: http://svn.apache.org/viewvc/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java?rev=1797230&r1=1797229&r2=1797230&view=diff
==============================================================================
--- openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java
(original)
+++ openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java
Thu Jun  1 15:08:20 2017
@@ -34,11 +34,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Stream;
 
 import javax.enterprise.context.RequestScoped;
 import javax.enterprise.event.NotificationOptions;
@@ -765,17 +767,29 @@ public final class NotificationManager
             }
         }
 
-        return complete(completableFutures, (T) event);
+        return async ? complete(completableFutures.toArray(new CompletableFuture[completableFutures.size()]),
(T) event) : null;
     }
 
-    private <T> CompletableFuture<T> complete(List<CompletableFuture<Void>>
completableFutures, T event)
+    private <T> CompletableFuture<T> complete(CompletableFuture<Void>[]
completableFutures, T event)
     {
         if (completableFutures == null)
         {
             return null;
         }
-        return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
-                                .thenApply(v -> event);
+        final CDICompletionFuture<T> future = new CDICompletionFuture<>(event);
+        // propagate the exception to the future aggregator (CDICompletionFuture)
+        Stream.of(completableFutures).forEach(f -> f.exceptionally(e ->
+        {
+            future.addError(e);
+            return null;
+        }));
+        // execute all futures and *once done* complete our future
+        CompletableFuture.allOf(completableFutures).handle((e, t) ->
+        {
+            future.done();
+            return null;
+        });
+        return future;
     }
 
     //X TODO review
@@ -784,7 +798,17 @@ public final class NotificationManager
                                            ObserverMethod<? super Object> observer,
                                            NotificationOptions notificationOptions)
     {
-        return CompletableFuture.runAsync(() -> runAsync(event, metadata, observer), notificationOptions.getExecutor());
+        CompletableFuture<?> future = new CompletableFuture<>();
+        try
+        {
+            runAsync(event, metadata, observer);
+            future.complete(null);
+        }
+        catch (final WebBeansException wbe)
+        {
+            future.completeExceptionally(wbe.getCause());
+        }
+        return future;
     }
 
     private void runAsync(Object event, EventMetadataImpl metadata, ObserverMethod<? super
Object> observer)
@@ -869,4 +893,37 @@ public final class NotificationManager
         return CONTAINER_EVENT_CLASSES.contains(paramType);
     }
 
+    // this behaves as a future aggregator, we don't strictly need to represent it but found
it more expressive
+    private static final class CDICompletionFuture<T> extends CompletableFuture<T>
+    {
+        private final T event;
+        private CompletionException error;
+
+        private CDICompletionFuture(final T event)
+        {
+            this.event = event;
+        }
+
+        CDICompletionFuture<T> addError(final Throwable t)
+        {
+            if (error == null)
+            {
+                error = new CompletionException(null);
+            }
+            error.addSuppressed(t);
+            return this;
+        }
+
+        void done()
+        {
+            if (error != null)
+            {
+                completeExceptionally(error);
+            }
+            else
+            {
+                complete(event);
+            }
+        }
+    }
 }

Modified: openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/ObserverMethodImpl.java
URL: http://svn.apache.org/viewvc/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/ObserverMethodImpl.java?rev=1797230&r1=1797229&r2=1797230&view=diff
==============================================================================
--- openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/ObserverMethodImpl.java
(original)
+++ openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/ObserverMethodImpl.java
Thu Jun  1 15:08:20 2017
@@ -355,6 +355,10 @@ public class ObserverMethodImpl<T> imple
                 }
             }                        
         }
+        catch (InvocationTargetException ite)
+        {
+            throw new WebBeansException(ite.getCause());
+        }
         catch (Exception e)
         {
             throw new WebBeansException(e);



Mime
View raw message