aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyjw...@apache.org
Subject svn commit: r1765345 - in /aries/trunk/async: async-impl/src/main/java/org/apache/aries/async/impl/ async-impl/src/test/java/org/apache/aries/async/impl/ promise-api/src/main/java/org/apache/aries/async/promise/ promise-api/src/main/java/org/osgi/util/...
Date Mon, 17 Oct 2016 17:52:11 GMT
Author: timothyjward
Date: Mon Oct 17 17:52:11 2016
New Revision: 1765345

URL: http://svn.apache.org/viewvc?rev=1765345&view=rev
Log:
[async] Update Promises to implement the provisional 1.1 API

Added:
    aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Callback.java
      - copied, changed from r1765304, aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java
    aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/TimeoutException.java
Modified:
    aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/Activator.java
    aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncService.java
    aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncServiceFactory.java
    aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/MethodCall.java
    aries/trunk/async/async-impl/src/test/java/org/apache/aries/async/impl/AsyncServiceTest.java
    aries/trunk/async/promise-api/src/main/java/org/apache/aries/async/promise/PromiseImpl.java
    aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java
    aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Predicate.java
    aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/packageinfo
    aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/Promise.java
    aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/packageinfo
    aries/trunk/async/promise-api/src/test/java/org/apache/aries/async/promise/test/ChainTest.java

Modified: aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/Activator.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/Activator.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/Activator.java
(original)
+++ aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/Activator.java
Mon Oct 17 17:52:11 2016
@@ -23,6 +23,7 @@ import java.security.PrivilegedAction;
 import java.util.Hashtable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -52,6 +53,25 @@ public class Activator implements Bundle
 			return t;
 		}
 	});
+
+	private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(new
ThreadFactory() {
+		
+		private final AtomicInteger count = new AtomicInteger();
+		
+		public Thread newThread(final Runnable r) {
+			Thread t = new Thread(new Runnable(){
+				public void run() {
+					AccessController.doPrivileged(new PrivilegedAction<Void>() {
+						public Void run() {
+							r.run();
+							return null;
+						}
+					});
+				}
+			}, "Asynchronous Execution Service Timing Thread " + count.incrementAndGet());
+			return t;
+		}
+	});
 	
 	private volatile ServiceTracker<LogService, LogService> logServiceTracker;
 	
@@ -59,10 +79,11 @@ public class Activator implements Bundle
 		logServiceTracker = new ServiceTracker<LogService, LogService>(context, LogService.class,
null);
 		logServiceTracker.open();
 		
-		context.registerService(Async.class.getName(), new AsyncServiceFactory(executor, logServiceTracker),
new Hashtable<String, Object>());
+		context.registerService(Async.class.getName(), new AsyncServiceFactory(executor, ses, logServiceTracker),
new Hashtable<String, Object>());
 	}
 
 	public void stop(BundleContext context) throws Exception {
+		ses.shutdownNow();
 		executor.shutdownNow();
 		logServiceTracker.close();
 	}

Modified: aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncService.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncService.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncService.java
(original)
+++ aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncService.java
Mon Oct 17 17:52:11 2016
@@ -29,6 +29,7 @@ import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.osgi.framework.Bundle;
 import org.osgi.framework.ServiceReference;
@@ -88,12 +89,15 @@ public class AsyncService implements Asy
 	
 	private final ExecutorService executor;
 	
+	private final ScheduledExecutorService ses;
+
 	private final ServiceTracker<LogService, LogService> logServiceTracker;
 	
-	public AsyncService(Bundle clientBundle, ExecutorService executor, ServiceTracker<LogService,
LogService> logServiceTracker) {
+	public AsyncService(Bundle clientBundle, ExecutorService executor, ScheduledExecutorService
ses, ServiceTracker<LogService, LogService> logServiceTracker) {
 		super();
 		this.clientBundle = clientBundle;
 		this.executor = executor;
+		this.ses = ses;
 		this.logServiceTracker = logServiceTracker;
 	}
 	
@@ -231,7 +235,7 @@ public class AsyncService implements Asy
 	public <T> Promise<T> call(T call) throws IllegalStateException {
 		MethodCall currentInvocation = consumeCurrentInvocation();
 		if(currentInvocation == null) throw new IllegalStateException("Incorrect API usage - this
thread has no pending method calls");
-		return currentInvocation.invokeAsynchronously(clientBundle, executor);
+		return currentInvocation.invokeAsynchronously(clientBundle, executor, ses);
 	}
 
 	public Promise<?> call() throws IllegalStateException {
@@ -241,7 +245,7 @@ public class AsyncService implements Asy
 	public Promise<Void> execute() throws IllegalStateException {
 		MethodCall currentInvocation = consumeCurrentInvocation();
 		if(currentInvocation == null) throw new IllegalStateException("Incorrect API usage - this
thread has no pending method calls");
-		return currentInvocation.fireAndForget(clientBundle, executor);
+		return currentInvocation.fireAndForget(clientBundle, executor, ses);
 	}
 
 	void registerInvocation(MethodCall invocation) {

Modified: aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncServiceFactory.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncServiceFactory.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncServiceFactory.java
(original)
+++ aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/AsyncServiceFactory.java
Mon Oct 17 17:52:11 2016
@@ -19,6 +19,7 @@
 package org.apache.aries.async.impl;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.osgi.framework.Bundle;
 import org.osgi.framework.ServiceFactory;
@@ -31,17 +32,21 @@ public class AsyncServiceFactory impleme
 
 	private final ExecutorService executor;
 	
+	private final ScheduledExecutorService ses;
+	
 	private final ServiceTracker<LogService, LogService> logServiceTracker;
 	
-	public AsyncServiceFactory(ExecutorService executor, ServiceTracker<LogService, LogService>
logServiceTracker) {
+	public AsyncServiceFactory(ExecutorService executor, ScheduledExecutorService ses, 
+			ServiceTracker<LogService, LogService> logServiceTracker) {
 		this.logServiceTracker = logServiceTracker;
 		this.executor = executor;
+		this.ses = ses;
 	}
 
 	public Async getService(Bundle bundle,
 			ServiceRegistration<Async> registration) {
 		
-		return new AsyncService(bundle, executor, logServiceTracker);
+		return new AsyncService(bundle, executor, ses, logServiceTracker);
 	}
 
 	public void ungetService(Bundle bundle,

Modified: aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/MethodCall.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/MethodCall.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/MethodCall.java
(original)
+++ aries/trunk/async/async-impl/src/main/java/org/apache/aries/async/impl/MethodCall.java
Mon Oct 17 17:52:11 2016
@@ -21,6 +21,7 @@ package org.apache.aries.async.impl;
 import java.lang.reflect.Method;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.aries.async.promise.PromiseImpl;
 import org.osgi.framework.Bundle;
@@ -86,9 +87,9 @@ public class MethodCall {
 		}
 	}
 	
-	public <V> Promise<V> invokeAsynchronously(Bundle clientBundle, ExecutorService
executor) {
+	public <V> Promise<V> invokeAsynchronously(Bundle clientBundle, ExecutorService
executor, ScheduledExecutorService ses) {
 		
-		PromiseImpl<V> promiseImpl = new PromiseImpl<V>(executor);
+		PromiseImpl<V> promiseImpl = new PromiseImpl<V>(executor, ses);
 
 		Object svc;
 		try {
@@ -137,8 +138,8 @@ public class MethodCall {
 		return promiseImpl;
 	}
 
-	public Promise<Void> fireAndForget(Bundle clientBundle, ExecutorService executor)
{
-		PromiseImpl<Void> started = new PromiseImpl<Void>(executor);
+	public Promise<Void> fireAndForget(Bundle clientBundle, ExecutorService executor,
ScheduledExecutorService ses) {
+		PromiseImpl<Void> started = new PromiseImpl<Void>(executor, ses);
 		Object svc;
 		try {
 			svc = getService();

Modified: aries/trunk/async/async-impl/src/test/java/org/apache/aries/async/impl/AsyncServiceTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/async-impl/src/test/java/org/apache/aries/async/impl/AsyncServiceTest.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/async-impl/src/test/java/org/apache/aries/async/impl/AsyncServiceTest.java
(original)
+++ aries/trunk/async/async-impl/src/test/java/org/apache/aries/async/impl/AsyncServiceTest.java
Mon Oct 17 17:52:11 2016
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
@@ -55,12 +56,15 @@ public class AsyncServiceTest {
 	
 	private ExecutorService es;
 	
+	private ScheduledExecutorService ses;
+	
 	@Mock
 	ServiceTracker<LogService, LogService> serviceTracker;
 	
 	@Before
 	public void start() {
 		es = Executors.newFixedThreadPool(3);
+		ses = Executors.newSingleThreadScheduledExecutor();
 	}
 
 	@After
@@ -71,6 +75,13 @@ public class AsyncServiceTest {
 		} catch (InterruptedException e) {
 			e.printStackTrace();
 		}
+
+		ses.shutdownNow();
+		try {
+			ses.awaitTermination(5, TimeUnit.SECONDS);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
 	}
 	
 	
@@ -78,7 +89,7 @@ public class AsyncServiceTest {
 	public void test() throws InterruptedException {
 		DelayedEcho raw = new DelayedEcho();
 		
-		AsyncService service = new AsyncService(null, es, 
+		AsyncService service = new AsyncService(null, es, ses,
 				serviceTracker);
 		
 		DelayedEcho mediated = service.mediate(raw, DelayedEcho.class);
@@ -108,7 +119,7 @@ public class AsyncServiceTest {
     public void testMultipleMediationsCacheClassLoader() throws Exception {
         DelayedEcho raw = new DelayedEcho();
         
-        AsyncService service = new AsyncService(null, es,
+        AsyncService service = new AsyncService(null, es, ses,
                                                 serviceTracker);
         
         DelayedEcho mediated = service.mediate(raw, DelayedEcho.class);
@@ -120,7 +131,7 @@ public class AsyncServiceTest {
     public void testMultipleMediationsCacheClassLoaderInterface() throws Exception {
     	CharSequence raw = "test";
     	
-    	AsyncService service = new AsyncService(null, es,
+    	AsyncService service = new AsyncService(null, es, ses,
     			serviceTracker);
     	
     	CharSequence mediated = service.mediate(raw, CharSequence.class);

Modified: aries/trunk/async/promise-api/src/main/java/org/apache/aries/async/promise/PromiseImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/apache/aries/async/promise/PromiseImpl.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/apache/aries/async/promise/PromiseImpl.java
(original)
+++ aries/trunk/async/promise-api/src/main/java/org/apache/aries/async/promise/PromiseImpl.java
Mon Oct 17 17:52:11 2016
@@ -18,6 +18,8 @@
  */
 package org.apache.aries.async.promise;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,16 +28,21 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import org.osgi.util.function.Callback;
 import org.osgi.util.function.Function;
 import org.osgi.util.function.Predicate;
 import org.osgi.util.promise.Failure;
 import org.osgi.util.promise.Promise;
 import org.osgi.util.promise.Success;
+import org.osgi.util.promise.TimeoutException;
 
 public class PromiseImpl<T> implements Promise<T> {
 
     private final Executor exec;
+    private final ScheduledExecutorService ses;
     private final List<Runnable> tasks = new ArrayList<Runnable>();
     private final CountDownLatch resolved = new CountDownLatch(1);
 
@@ -46,15 +53,18 @@ public class PromiseImpl<T> implements P
     private T value;
 
     public PromiseImpl() {
-        // Executor for onResolve() callbacks
-        // We could use an Executor that runs tasks in current thread
         this(Executors.newSingleThreadExecutor());
     }
 
     public PromiseImpl(Executor executor) {
+    	this(executor, Executors.newSingleThreadScheduledExecutor());
+    }
+
+    public PromiseImpl(Executor executor, ScheduledExecutorService ses) {
     	// Executor for onResolve() callbacks
     	// We could use an Executor that runs tasks in current thread
     	exec = executor;
+    	this.ses = ses;
     }
 
     public void fail(Throwable failure) {
@@ -70,7 +80,7 @@ public class PromiseImpl<T> implements P
     public Promise<Void> resolveWith(final Promise<? extends T> with) {
         if (with == null)
             throw new NullPointerException();
-        final PromiseImpl<Void> result = new PromiseImpl<Void>(exec);
+        final PromiseImpl<Void> result = new PromiseImpl<Void>(exec, ses);
 
         with.then(new Success<T, T>() {
             @Override
@@ -205,7 +215,7 @@ public class PromiseImpl<T> implements P
 
     @Override
     public <R> Promise<R> then(Success<? super T, ? extends R> success,
Failure failure) {
-        PromiseImpl<R> result = new PromiseImpl<R>(exec);
+        PromiseImpl<R> result = new PromiseImpl<R>(exec, ses);
         result.onSuccess = success;
         result.onFailure = failure;
         synchronized (this) {
@@ -224,12 +234,30 @@ public class PromiseImpl<T> implements P
     public <R> Promise<R> then(Success<? super T, ? extends R> success)
{
         return then(success, null);
     }
-
+    
     @Override
+	public Promise<T> then(final Callback callback) {
+    	if (callback == null)
+            throw new NullPointerException();
+    	return then(new Success<T,T>() {
+			@Override
+			public Promise<T> call(Promise<T> resolved) throws Exception {
+				callback.run();
+				return resolved;
+			}
+    	}, new Failure(){
+			@Override
+			public void fail(Promise<?> resolved) throws Exception {
+				callback.run();
+			}
+    	});
+	}
+
+	@Override
     public Promise<T> filter(final Predicate<? super T> predicate) {
         if (predicate == null)
             throw new NullPointerException();
-        final PromiseImpl<T> result = new PromiseImpl<T>(exec);
+        final PromiseImpl<T> result = new PromiseImpl<T>(exec, ses);
 
         then(new Success<T, T>() {
             @Override
@@ -259,7 +287,7 @@ public class PromiseImpl<T> implements P
     public <R> Promise<R> map(final Function<? super T, ? extends R> mapper)
{
         if (mapper == null)
             throw new NullPointerException();
-        final PromiseImpl<R> result = new PromiseImpl<R>(exec);
+        final PromiseImpl<R> result = new PromiseImpl<R>(exec, ses);
 
         then(new Success<T, T>() {
             @Override
@@ -286,7 +314,7 @@ public class PromiseImpl<T> implements P
     public <R> Promise<R> flatMap(final Function<? super T, Promise<? extends
R>> mapper) {
         if (mapper == null)
             throw new NullPointerException();
-        final PromiseImpl<R> result = new PromiseImpl<R>(exec);
+        final PromiseImpl<R> result = new PromiseImpl<R>(exec, ses);
 
         then(new Success<T, T>() {
             @Override
@@ -314,7 +342,7 @@ public class PromiseImpl<T> implements P
         if (recovery == null)
             throw new NullPointerException();
 
-        final PromiseImpl<T> result = new PromiseImpl<T>(exec);
+        final PromiseImpl<T> result = new PromiseImpl<T>(exec, ses);
 
         then(new Success<T, T>() {
             @Override
@@ -346,7 +374,7 @@ public class PromiseImpl<T> implements P
         if (recovery == null)
             throw new NullPointerException();
 
-        final PromiseImpl<T> result = new PromiseImpl<T>(exec);
+        final PromiseImpl<T> result = new PromiseImpl<T>(exec, ses);
 
         then(new Success<T, T>() {
             @Override
@@ -378,7 +406,7 @@ public class PromiseImpl<T> implements P
         if (fallback == null)
             throw new NullPointerException();
 
-        final PromiseImpl<T> result = new PromiseImpl<T>(exec);
+        final PromiseImpl<T> result = new PromiseImpl<T>(exec, ses);
 
         then(new Success<T, T>() {
             @Override
@@ -401,4 +429,66 @@ public class PromiseImpl<T> implements P
 
         return result;
     }
+
+	@Override
+	public Promise<T> timeout(long milliseconds) {
+		final PromiseImpl<T> p = new PromiseImpl<T>();
+		
+		p.resolveWith(this);
+		
+		ses.schedule(new Runnable(){
+			@Override
+			public void run() {
+				if(!p.isDone()) {
+					try {
+						p.fail(new TimeoutException());
+					} catch (Exception e) {
+						// Already resolved
+					}
+				}
+			}
+		}, milliseconds, MILLISECONDS);
+		
+		return p;
+	}
+
+	@Override
+	public Promise<T> delay(final long milliseconds) {
+		final PromiseImpl<T> p = new PromiseImpl<T>();
+		then(new Success<T,T>() {
+			@Override
+			public Promise<T> call(final Promise<T> resolved) throws Exception {
+				ses.schedule(new Runnable(){
+					@Override
+					public void run() {
+						try {
+							p.resolve(resolved.getValue());
+						} catch (IllegalStateException ise) {
+							// Someone else resolved our promise?
+						} catch (Exception e) {
+							p.fail(e);
+						}
+					}
+				}, milliseconds, MILLISECONDS);
+				return null;
+			}
+    	}, new Failure(){
+			@Override
+			public void fail(final Promise<?> resolved) throws Exception {
+				ses.schedule(new Runnable(){
+					@Override
+					public void run() {
+						try {
+							p.fail(resolved.getFailure());
+						} catch (Exception e) {
+							p.fail(e);
+						}
+					}
+				}, milliseconds, MILLISECONDS);
+			}
+    	});
+		return p;
+	}
+    
+    
 }

Copied: aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Callback.java (from
r1765304, aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java)
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Callback.java?p2=aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Callback.java&p1=aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java&r1=1765304&r2=1765345&rev=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java (original)
+++ aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Callback.java Mon Oct
17 17:52:11 2016
@@ -1,6 +1,6 @@
 /*
- * Copyright (c) OSGi Alliance 2015. All Rights Reserved.
- *
+ * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
+ * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -13,23 +13,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.osgi.util.function;
 
+import org.osgi.annotation.versioning.ConsumerType;
+
 /**
- * A function that accepts a single argument and produces a result.
+ * A callback that performs an operation and may throw an exception.
  * <p>
- * This is a functional interface and can be used as the assignment target for a lambda expression
or method reference.
- *
- * @param <T> The type of the function input.
- * @param <R> The type of the function output.
+ * This is a functional interface and can be used as the assignment target for a
+ * lambda expression or method reference.
+ * 
+ * @ThreadSafe
+ * @since 1.1
+ * @author $Id: 17ff376bc9c8c171caad89eb9d0bc496f46961ee $
  */
-@org.osgi.annotation.versioning.ConsumerType
-public interface Function<T, R> {
-
-    /**
-     * Applies this function to the specified argument.
-     * @param t The input to this function.
-     * @return The output of this function.
-     */
-    R apply(T t);
+@ConsumerType
+@FunctionalInterface
+public interface Callback {
+	/**
+	 * Execute the callback.
+	 * 
+	 * @throws Exception An exception thrown by the method.
+	 */
+	void run() throws Exception;
 }

Modified: aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java (original)
+++ aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Function.java Mon Oct
17 17:52:11 2016
@@ -30,6 +30,7 @@ public interface Function<T, R> {
      * Applies this function to the specified argument.
      * @param t The input to this function.
      * @return The output of this function.
+     * @throws An Exception
      */
-    R apply(T t);
+    R apply(T t) throws Exception;
 }

Modified: aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Predicate.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Predicate.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Predicate.java (original)
+++ aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/Predicate.java Mon
Oct 17 17:52:11 2016
@@ -29,6 +29,7 @@ public interface Predicate<T> {
      *
      * @param t The input to this predicate.
      * @return true if the specified argument is accepted by this predicate; false otherwise.
+     * @throws an Exception
      */
-    boolean test(T t);
+    boolean test(T t) throws Exception;
 }

Modified: aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/packageinfo
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/packageinfo?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/packageinfo (original)
+++ aries/trunk/async/promise-api/src/main/java/org/osgi/util/function/packageinfo Mon Oct
17 17:52:11 2016
@@ -1 +1 @@
-version 1.0.0
+version 1.1.0

Modified: aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/Promise.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/Promise.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/Promise.java (original)
+++ aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/Promise.java Mon Oct
17 17:52:11 2016
@@ -15,11 +15,12 @@
  */
 package org.osgi.util.promise;
 
+import java.lang.reflect.InvocationTargetException;
+
+import org.osgi.util.function.Callback;
 import org.osgi.util.function.Function;
 import org.osgi.util.function.Predicate;
 
-import java.lang.reflect.InvocationTargetException;
-
 /**
  * A Promise of a value.
  * <p/>
@@ -171,6 +172,40 @@ public interface Promise<T> {
      * @see #then(Success, Failure)
      */
     <R> Promise<R> then(Success<? super T, ? extends R> success);
+    
+	/**
+	 * Chain a new Promise to this Promise with a callback.
+	 * <p>
+	 * The specified {@link Callback} is called when this Promise is resolved
+	 * either successfully or with a failure.
+	 * <p>
+	 * This method returns a new Promise which is chained to this Promise. The
+	 * returned Promise must be resolved when this Promise is resolved after the
+	 * specified callback is executed. If the callback throws an exception, the
+	 * returned Promise is failed with that exception. Otherwise the returned
+	 * Promise is resolved with this Promise.
+	 * <p>
+	 * This method may be called at any time including before and after this
+	 * Promise has been resolved.
+	 * <p>
+	 * Resolving this Promise <i>happens-before</i> any registered callback is
+	 * called. That is, in a registered callback, {@link #isDone()} must return
+	 * {@code true} and {@link #getValue()} and {@link #getFailure()} must not
+	 * block.
+	 * <p>
+	 * A callback may be called on a different thread than the thread which
+	 * registered the callback. So the callback must be thread safe but can rely
+	 * upon that the registration of the callback <i>happens-before</i> the
+	 * registered callback is called.
+	 * 
+	 * @param callback A callback to be called when this Promise is resolved.
+	 *            Must not be {@code null}.
+	 * @return A new Promise which is chained to this Promise. The returned
+	 *         Promise must be resolved when this Promise is resolved after the
+	 *         specified callback is executed.
+	 * @since 1.1
+	 */
+	Promise<T> then(Callback callback);
 
 
     /**
@@ -290,5 +325,37 @@ public interface Promise<T> {
      * @return A Promise that returns the value of this Promise or falls back to the value
of the specified Promise.
      */
     Promise<T> fallbackTo(Promise<? extends T> fallback);
+    
+	/**
+	 * Time out the resolution of this Promise.
+	 * <p>
+	 * If this Promise is successfully resolved before the timeout, the returned
+	 * Promise is resolved with the value of this Promise. If this Promise is
+	 * resolved with a failure before the timeout, the returned Promise is
+	 * resolved with the failure of this Promise. If the timeout is reached
+	 * before this Promise is resolved, the returned Promise is failed with a
+	 * {@link TimeoutException}.
+	 * 
+	 * @param milliseconds The time to wait in milliseconds. Zero and negative
+	 *            time is treated as an immediate timeout.
+	 * @return A Promise that is resolved when either this Promise is resolved
+	 *         or the specified timeout is reached.
+	 * @since 1.1
+	 */
+	Promise<T> timeout(long milliseconds);
+
+	/**
+	 * Delay after the resolution of this Promise.
+	 * <p>
+	 * Once this Promise is resolved, resolve the returned Promise with this
+	 * Promise after the specified delay.
+	 * 
+	 * @param milliseconds The time to delay in milliseconds. Zero and negative
+	 *            time is treated as no delay.
+	 * @return A Promise that is resolved with this Promise after this Promise
+	 *         is resolved and the specified delay has elapsed.
+	 * @since 1.1
+	 */
+	Promise<T> delay(long milliseconds);
 
 }

Added: aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/TimeoutException.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/TimeoutException.java?rev=1765345&view=auto
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/TimeoutException.java
(added)
+++ aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/TimeoutException.java
Mon Oct 17 17:52:11 2016
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.promise;
+
+/**
+ * Timeout exception for a Promise.
+ * 
+ * @since 1.1
+ * @author $Id: 09186f5527a0552b14f95fab5e5468f47b536d43 $
+ */
+public class TimeoutException extends Exception {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Create a new {@code TimeoutException}.
+	 */
+	public TimeoutException() {
+		super();
+	}
+}

Modified: aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/packageinfo
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/packageinfo?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/packageinfo (original)
+++ aries/trunk/async/promise-api/src/main/java/org/osgi/util/promise/packageinfo Mon Oct
17 17:52:11 2016
@@ -1 +1 @@
-version 1.0.0
+version 1.1.0

Modified: aries/trunk/async/promise-api/src/test/java/org/apache/aries/async/promise/test/ChainTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/async/promise-api/src/test/java/org/apache/aries/async/promise/test/ChainTest.java?rev=1765345&r1=1765344&r2=1765345&view=diff
==============================================================================
--- aries/trunk/async/promise-api/src/test/java/org/apache/aries/async/promise/test/ChainTest.java
(original)
+++ aries/trunk/async/promise-api/src/test/java/org/apache/aries/async/promise/test/ChainTest.java
Mon Oct 17 17:52:11 2016
@@ -18,10 +18,28 @@
  */
 package org.apache.aries.async.promise.test;
 
-import org.junit.Test;
-import org.osgi.util.promise.*;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
+import org.osgi.util.function.Callback;
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Failure;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.Promises;
+import org.osgi.util.promise.Success;
+import org.osgi.util.promise.TimeoutException;
 
 public class ChainTest {
 
@@ -124,7 +142,7 @@ public class ChainTest {
         Deferred<String> def = new Deferred<String>();
         final Promise<String> promise = def.getPromise();
 
-        Promise<String> chain = promise.then(null);
+        Promise<String> chain = promise.then((Success)null);
         assertFalse("chain not resolved", chain.isDone());
 
         Throwable failure = new Throwable("fail!");
@@ -156,7 +174,7 @@ public class ChainTest {
     public void testThenNull() throws Exception {
         Deferred<String> def = new Deferred<String>();
         final Promise<String> promise = def.getPromise();
-        Promise<String> chain = promise.then(null);
+        Promise<String> chain = promise.then((Success)null);
         assertFalse("chain not resolved", chain.isDone());
 
         def.resolve("ok");
@@ -168,7 +186,7 @@ public class ChainTest {
     public void testThenNullResolved() throws Exception {
         Deferred<String> def = new Deferred<String>();
         def.resolve("ok");
-        Promise<String> chain = def.getPromise().then(null);
+        Promise<String> chain = def.getPromise().then((Success)null);
 
         assertTrue("chain resolved", chain.isDone());
         assertNull("chain value null", chain.getValue());
@@ -269,4 +287,184 @@ public class ChainTest {
 
         assertEquals("chain value matches", "success2", chain.getValue());
     }
+    
+    @Test
+    public void testThenCallbackSuccess() throws Exception {
+    	Deferred<String> def = new Deferred<String>();
+
+    	final AtomicBoolean run = new AtomicBoolean(false);
+    	
+        Promise<String> chain = def.getPromise().then(new Callback() {
+            @Override
+            public void run() throws Exception {
+                run.set(true);
+            }
+        });
+        assertFalse("chain should not be resolved", chain.isDone());
+        assertFalse("callback should not have been run", run.get());
+
+        def.resolve("ok");
+        assertTrue("chain resolved", chain.isDone());
+        assertEquals("chain value matches", "ok", chain.getValue());
+        assertTrue("callback should have been run", run.get());
+    	
+    }
+
+    @Test
+    public void testThenCallbackFail() throws Exception {
+    	Deferred<String> def = new Deferred<String>();
+    	
+    	final AtomicBoolean run = new AtomicBoolean(false);
+    	
+    	Promise<String> chain = def.getPromise().then(new Callback() {
+    		@Override
+    		public void run() throws Exception {
+    			run.set(true);
+    		}
+    	});
+    	
+    	Exception failure = new Exception("bang!");
+    	
+    	assertFalse("chain should not be resolved", chain.isDone());
+    	assertFalse("callback should not have been run", run.get());
+    	
+    	def.fail(failure);
+    	assertTrue("chain resolved", chain.isDone());
+    	assertSame("chain value matches", failure, chain.getFailure());
+    	assertTrue("callback should have been run", run.get());
+    	
+    }
+
+    @Test
+    public void testThenCallbackThrowsExceptionSuccess() throws Exception {
+    	Deferred<String> def = new Deferred<String>();
+    	
+    	final Exception failure = new Exception("bang!");
+    	
+    	Promise<String> chain = def.getPromise().then(new Callback() {
+    		@Override
+    		public void run() throws Exception {
+    			throw failure;
+    		}
+    	});
+    	
+    	assertFalse("chain should not be resolved", chain.isDone());
+    	
+    	def.resolve("ok");
+    	assertTrue("chain resolved", chain.isDone());
+    	assertSame("chain value matches", failure, chain.getFailure());
+    	
+    }
+
+    @Test
+    public void testThenCallbackThrowsExceptionFail() throws Exception {
+    	Deferred<String> def = new Deferred<String>();
+    	
+    	final Exception failure = new Exception("bang!");
+    	
+    	Promise<String> chain = def.getPromise().then(new Callback() {
+    		@Override
+    		public void run() throws Exception {
+    			throw failure;
+    		}
+    	});
+    	
+    	assertFalse("chain should not be resolved", chain.isDone());
+    	
+    	def.fail(new IllegalStateException());
+    	assertTrue("chain resolved", chain.isDone());
+    	assertSame("chain value matches", failure, chain.getFailure());
+    	
+    }
+    
+    @Test
+    public void testTimeout() throws Exception {
+    	Deferred<String> def = new Deferred<String>();
+
+        Promise<String> promise = def.getPromise();
+        
+        long start = System.nanoTime();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicLong finish = new AtomicLong();
+        
+		Promise<String> chain = promise.timeout(500)
+				.onResolve(new Runnable() {
+					@Override
+					public void run() {
+						finish.set(System.nanoTime());
+						latch.countDown();
+					}
+				});
+		
+		assertFalse("promise should not be resolved", promise.isDone());
+		assertFalse("chain should not be resolved", chain.isDone());
+		
+		assertTrue("Did not time out!", latch.await(1, SECONDS));
+		assertTrue("Finished too fast", NANOSECONDS.toMillis(finish.get() - start) > 450);
+
+		assertFalse("promise should not be resolved", promise.isDone());
+		assertTrue("chain should now be resolved", chain.isDone());
+		
+        assertTrue("Should fail with a timeout exception", chain.getFailure() instanceof
TimeoutException);
+    }
+
+    @Test
+    public void testTimeoutSuccess() throws Exception {
+    	Deferred<String> def = new Deferred<String>();
+    	
+    	Promise<String> promise = def.getPromise();
+    	
+    	final CountDownLatch latch = new CountDownLatch(1);
+    	
+    	Promise<String> chain = promise.timeout(500)
+    			.onResolve(new Runnable() {
+    				@Override
+    				public void run() {
+    					latch.countDown();
+    				}
+    			});
+    	
+    	assertFalse("promise should not be resolved", promise.isDone());
+    	assertFalse("chain should not be resolved", chain.isDone());
+    	
+    	def.resolve("ok");
+    	
+    	assertTrue("Did not eagerly complete!", latch.await(100, MILLISECONDS));
+    	
+    	assertTrue("promise should not be resolved", promise.isDone());
+    	assertTrue("chain should now be resolved", chain.isDone());
+    	
+    	assertEquals(promise.getValue(), chain.getValue());
+    }
+    
+    @Test
+    public void testTimeoutFailure() throws Exception{
+    	Deferred<String> def = new Deferred<String>();
+    	
+    	Promise<String> promise = def.getPromise();
+    	
+    	final CountDownLatch latch = new CountDownLatch(1);
+    	
+    	Promise<String> chain = promise.timeout(500)
+    			.onResolve(new Runnable() {
+    				@Override
+    				public void run() {
+    					latch.countDown();
+    				}
+    			});
+    	
+    	assertFalse("promise should not be resolved", promise.isDone());
+    	assertFalse("chain should not be resolved", chain.isDone());
+    	
+    	Exception failure = new Exception("bang!");
+    	
+    	def.fail(failure);
+    	
+    	assertTrue("Did not eagerly complete!", latch.await(100, MILLISECONDS));
+    	
+    	assertTrue("promise should not be resolved", promise.isDone());
+    	assertTrue("chain should now be resolved", chain.isDone());
+    	
+    	assertSame(promise.getFailure(), chain.getFailure());
+    }
 }




Mime
View raw message