Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6017200CAA for ; Sat, 17 Jun 2017 11:06:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CC3C9160BD1; Sat, 17 Jun 2017 09:06:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1DD82160BCD for ; Sat, 17 Jun 2017 11:06:40 +0200 (CEST) Received: (qmail 92892 invoked by uid 500); 17 Jun 2017 09:06:40 -0000 Mailing-List: contact commits-help@openwebbeans.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@openwebbeans.apache.org Delivered-To: mailing list commits@openwebbeans.apache.org Received: (qmail 92881 invoked by uid 99); 17 Jun 2017 09:06:39 -0000 Received: from Unknown (HELO svn01-us-west.apache.org) (209.188.14.144) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Jun 2017 09:06:39 +0000 Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 4380D3A194C for ; Sat, 17 Jun 2017 09:06:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1798998 - /openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java Date: Sat, 17 Jun 2017 09:06:35 -0000 To: commits@openwebbeans.apache.org From: struberg@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20170617090638.4380D3A194C@svn01-us-west.apache.org> archived-at: Sat, 17 Jun 2017 09:06:42 -0000 Author: struberg Date: Sat Jun 17 09:06:35 2017 New Revision: 1798998 URL: http://svn.apache.org/viewvc?rev=1798998&view=rev Log: OWB-1188 improve test to show problems with async error handling Modified: openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java Modified: openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java URL: http://svn.apache.org/viewvc/openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java?rev=1798998&r1=1798997&r2=1798998&view=diff ============================================================================== --- openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java (original) +++ openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/events/async/ObserversAsyncTest.java Sat Jun 17 09:06:35 2017 @@ -24,14 +24,18 @@ import javax.enterprise.inject.spi.Exten import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import org.apache.webbeans.test.AbstractUnitTest; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -39,33 +43,49 @@ import static org.junit.Assert.assertNot public class ObserversAsyncTest extends AbstractUnitTest { + + @Test - public void testAsyncEventExceptionHandling() throws ExecutionException, InterruptedException + public void testAsyncEventExceptionHandling_handle() throws ExecutionException, InterruptedException { - final int count = 100 + ForkJoinPool.getCommonPoolParallelism() * 10; + final int count = 10 + ForkJoinPool.getCommonPoolParallelism() * 5; final VisitorCollectorEvent event = new VisitorCollectorEvent(); - addExtension(new Extension() { - void addABunchOfObserversAtLeastMoreThanThreads(@Observes final AfterBeanDiscovery afterBeanDiscovery) + addExtension(new ParallelObserveExtension(count)); + startContainer(); + + BlockingQueue queue = new LinkedBlockingQueue<>(); + + long start = System.nanoTime(); + + getBeanManager().getEvent().fireAsync(event) + .handle((e, t) -> { - IntStream.range(0, count) - .forEach(i -> afterBeanDiscovery.addObserverMethod() - .observedType(VisitorCollectorEvent.class) - .async(true) - .notifyWith(e -> - { - if (i % 2 == 0 && (i < 30 || i > 70)) - { - sleep(500); - } - - final String name = "Observer" + i; - event.visiting(name); - throw new IllegalStateException(name); - })); - } - }); + return queue.offer(t); + }); + + Throwable t = queue.poll(20, TimeUnit.SECONDS); + + long end = System.nanoTime(); + long durationMs = TimeUnit.NANOSECONDS.toMillis(end - start); + System.out.println("took ms: " + durationMs); + + Assert.assertNotNull(t); + Assert.assertTrue(t instanceof CompletionException); + CompletionException ce = (CompletionException) t; + Assert.assertEquals(count, ce.getSuppressed().length); + + } + + @Test + public void testAsyncEventExceptionHandling_CompletableFuture() throws ExecutionException, InterruptedException + { + final int count = 10 + ForkJoinPool.getCommonPoolParallelism() * 5; + + final VisitorCollectorEvent event = new VisitorCollectorEvent(); + + addExtension(new ParallelObserveExtension(count)); startContainer(); final AtomicReference observerException = new AtomicReference<>(); @@ -118,4 +138,33 @@ public class ObserversAsyncTest extends // ignore } } + + private class ParallelObserveExtension implements Extension + { + private final int count; + + public ParallelObserveExtension(int count) + { + this.count = count; + } + + void addABunchOfObserversAtLeastMoreThanThreads(@Observes final AfterBeanDiscovery afterBeanDiscovery) + { + IntStream.range(0, count) + .forEach(i -> afterBeanDiscovery.addObserverMethod() + .observedType(VisitorCollectorEvent.class) + .async(true) + .notifyWith(e -> + { + if (i % 2 == 0 && (i < 30 || i > 70)) + { + sleep(500); + } + + final String name = "Observer" + i; + e.getEvent().visiting(name); + throw new IllegalStateException(name); + })); + } + } }