aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyjw...@apache.org
Subject svn commit: r1766040 [2/2] - in /aries/trunk: ./ pushstream/ pushstream/pushstream/ pushstream/pushstream/src/ pushstream/pushstream/src/main/ pushstream/pushstream/src/main/java/ pushstream/pushstream/src/main/java/org/ pushstream/pushstream/src/main/...
Date Fri, 21 Oct 2016 15:10:51 GMT
Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+
+/**
+ * A Push Stream fulfills the same role as the Java 8 stream but it reverses the
+ * control direction. The Java 8 stream is pull based and this is push based. A
+ * Push Stream makes it possible to build a pipeline of transformations using a
+ * builder kind of model. Just like streams, it provides a number of terminating
+ * methods that will actually open the channel and perform the processing until
+ * the channel is closed (The source sends a Close event). The results of the
+ * processing will be send to a Promise, just like any error events. A stream
+ * can be used multiple times. The Push Stream represents a pipeline. Upstream
+ * is in the direction of the source, downstream is in the direction of the
+ * terminating method. Events are sent downstream asynchronously with no
+ * guarantee for ordering or concurrency. Methods are available to provide
+ * serialization of the events and splitting in background threads.
+ * 
+ * @param <T> The Payload type
+ */
+@ProviderType
+public interface PushStream<T> extends AutoCloseable {
+
+	/**
+	 * Must be run after the channel is closed. This handler will run after the
+	 * downstream methods have processed the close event and before the upstream
+	 * methods have closed.
+	 * 
+	 * @param closeHandler Will be called on close
+	 * @return This stream
+	 */
+	PushStream<T> onClose(Runnable closeHandler);
+
+	/**
+	 * Must be run after the channel is closed. This handler will run after the
+	 * downstream methods have processed the close event and before the upstream
+	 * methods have closed.
+	 * 
+	 * @param closeHandler Will be called on close
+	 * @return This stream
+	 */
+	PushStream<T> onError(Consumer< ? super Throwable> closeHandler);
+
+	/**
+	 * Only pass events downstream when the predicate tests true.
+	 * 
+	 * @param predicate The predicate that is tested (not null)
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> filter(Predicate< ? super T> predicate);
+
+	/**
+	 * Map a payload value.
+	 * 
+	 * @param mapper The map function
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
+
+	/**
+	 * Flat map the payload value (turn one event into 0..n events of
+	 * potentially another type).
+	 * 
+	 * @param mapper The flat map function
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> flatMap(
+			Function< ? super T, ? extends PushStream< ? extends R>> mapper);
+
+	/**
+	 * Remove any duplicates. Notice that this can be expensive in a large
+	 * stream since it must track previous payloads.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> distinct();
+
+	/**
+	 * Sorted the elements, assuming that T extends Comparable. This is of
+	 * course expensive for large or infinite streams since it requires
+	 * buffering the stream until close.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> sorted();
+
+	/**
+	 * Sorted the elements with the given comparator. This is of course
+	 * expensive for large or infinite streams since it requires buffering the
+	 * stream until close.
+	 * 
+	 * @param comparator
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> sorted(Comparator< ? super T> comparator);
+
+	/**
+	 * Automatically close the channel after the maxSize number of elements is
+	 * received.
+	 * 
+	 * @param maxSize Maximum number of elements has been received
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> limit(long maxSize);
+
+	/**
+	 * Skip a number of events in the channel.
+	 * 
+	 * @param n number of elements to skip
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> skip(long n);
+
+	/**
+	 * Execute the downstream events in up to n background threads. If more
+	 * requests are outstanding apply delay * nr of delayed threads back
+	 * pressure. A downstream channel that is closed or throws an exception will
+	 * cause all execution to cease and the stream to close
+	 * 
+	 * @param n number of simultaneous background threads to use
+	 * @param delay Nr of ms/thread that is queued back pressure
+	 * @param e an executor to use for the background threads.
+	 * @return Builder style (can be a new or the same object)
+	 * @throws IllegalArgumentException if the number of threads is < 1 or the
+	 *             delay is < 0
+	 * @throws NullPointerException if the Executor is null
+	 */
+	PushStream<T> fork(int n, int delay, Executor e)
+			throws IllegalArgumentException, NullPointerException;
+
+	/**
+	 * Buffer the events in a queue using default values for the queue size and
+	 * other behaviours. Buffered work will be processed asynchronously in the
+	 * rest of the chain. Buffering also blocks the transmission of back
+	 * pressure to previous elements in the chain, although back pressure is
+	 * honoured by the buffer.
+	 * <p>
+	 * Buffers are useful for "bursty" event sources which produce a number of
+	 * events close together, then none for some time. These bursts can
+	 * sometimes overwhelm downstream event consumers. Buffering will not,
+	 * however, protect downstream components from a source which produces
+	 * events faster than they can be consumed. For fast sources
+	 * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+	 * {@link #fork(int, int, Executor)} are better choices.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> buffer();
+
+	/**
+	 * Build a buffer to enqueue events in a queue using custom values for the
+	 * queue size and other behaviours. Buffered work will be processed
+	 * asynchronously in the rest of the chain. Buffering also blocks the
+	 * transmission of back pressure to previous elements in the chain, although
+	 * back pressure is honoured by the buffer.
+	 * <p>
+	 * Buffers are useful for "bursty" event sources which produce a number of
+	 * events close together, then none for some time. These bursts can
+	 * sometimes overwhelm downstream event consumers. Buffering will not,
+	 * however, protect downstream components from a source which produces
+	 * events faster than they can be consumed. For fast sources
+	 * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+	 * {@link #fork(int, int, Executor)} are better choices.
+	 * <p>
+	 * Buffers are also useful as "circuit breakers" in the pipeline. If a
+	 * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger
+	 * the stream to close, preventing an event storm from reaching the client.
+	 * 
+	 * @param parallelism
+	 * @param executor
+	 * @param queue
+	 * @param queuePolicy
+	 * @param pushbackPolicy
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer();
+
+	/**
+	 * Merge in the events from another source. The resulting channel is not
+	 * closed until this channel and the channel from the source are closed.
+	 * 
+	 * @param source The source to merge in.
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> merge(PushEventSource< ? extends T> source);
+
+	/**
+	 * Merge in the events from another PushStream. The resulting channel is not
+	 * closed until this channel and the channel from the source are closed.
+	 * 
+	 * @param source The source to merge in.
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> merge(PushStream< ? extends T> source);
+
+	/**
+	 * Split the events to different streams based on a predicate. If the
+	 * predicate is true, the event is dispatched to that channel on the same
+	 * position. All predicates are tested for every event.
+	 * <p>
+	 * This method differs from other methods of AsyncStream in three
+	 * significant ways:
+	 * <ul>
+	 * <li>The return value contains multiple streams.</li>
+	 * <li>This stream will only close when all of these child streams have
+	 * closed.</li>
+	 * <li>Event delivery is made to all open children that accept the event.
+	 * </li>
+	 * </ul>
+	 * 
+	 * @param predicates the predicates to test
+	 * @return streams that map to the predicates
+	 */
+	@SuppressWarnings("unchecked")
+	PushStream<T>[] split(Predicate< ? super T>... predicates);
+
+	/**
+	 * Ensure that any events are delivered sequentially. That is, no
+	 * overlapping calls downstream. This can be used to turn a forked stream
+	 * (where for example a heavy conversion is done in multiple threads) back
+	 * into a sequential stream so a reduce is simple to do.
+	 * 
+	 * @return Builder style (can be a new or the same object)
+	 */
+	PushStream<T> sequential();
+
+	/**
+	 * Coalesces a number of events into a new type of event. The input events
+	 * are forwarded to a accumulator function. This function returns an
+	 * Optional. If the optional is present, it's value is send downstream,
+	 * otherwise it is ignored.
+	 * 
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f);
+
+	/**
+	 * Coalesces a number of events into a new type of event. A fixed number of
+	 * input events are forwarded to a accumulator function. This function
+	 * returns new event data to be forwarded on.
+	 * 
+	 * @param count
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f);
+
+	/**
+	 * Coalesces a number of events into a new type of event. A variable number
+	 * of input events are forwarded to a accumulator function. The number of
+	 * events to be forwarded is determined by calling the count function. The
+	 * accumulator function then returns new event data to be forwarded on.
+	 * 
+	 * @param count
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	public <R> PushStream<R> coalesce(IntSupplier count,
+			Function<Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a fixed time interval and then forwards
+	 * the events to an accumulator function. This function returns new event
+	 * data to be forwarded on. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered as a different task, (and therefore potentially on a different
+	 * thread) from the one that delivered the event to this {@link PushStream}.
+	 * </li>
+	 * <li>Due to the buffering and asynchronous delivery required, this method
+	 * prevents the propagation of back-pressure to earlier stages</li>
+	 * </ul>
+	 * 
+	 * @param d
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Duration d, Function<Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a fixed time interval and then forwards
+	 * the events to an accumulator function. This function returns new event
+	 * data to be forwarded on. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered by a task given to the supplied executor.</li>
+	 * <li>Due to the buffering and asynchronous delivery required, this method
+	 * prevents the propagation of back-pressure to earlier stages</li>
+	 * </ul>
+	 * 
+	 * @param d
+	 * @param executor
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Duration d, Executor executor,
+			Function<Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a variable time interval and then
+	 * forwards the events to an accumulator function. The length of time over
+	 * which events are buffered is determined by the time function. A maximum
+	 * number of events can also be requested, if this number of events is
+	 * reached then the accumulator will be called early. The accumulator
+	 * function returns new event data to be forwarded on. It is also given the
+	 * length of time for which the buffer accumulated data. This may be less
+	 * than the requested interval if the buffer reached the maximum number of
+	 * requested events early. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered as a different task, (and therefore potentially on a different
+	 * thread) from the one that delivered the event to this {@link PushStream}.
+	 * </li>
+	 * <li>Due to the buffering and asynchronous delivery required, this method
+	 * prevents the propagation of back-pressure to earlier stages</li>
+	 * <li>If the window finishes by hitting the maximum number of events then
+	 * the remaining time in the window will be applied as back-pressure to the
+	 * previous stage, attempting to slow the producer to the expected windowing
+	 * threshold.</li>
+	 * </ul>
+	 * 
+	 * @param timeSupplier
+	 * @param maxEvents
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Supplier<Duration> timeSupplier,
+			IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f);
+
+	/**
+	 * Buffers a number of events over a variable time interval and then
+	 * forwards the events to an accumulator function. The length of time over
+	 * which events are buffered is determined by the time function. A maximum
+	 * number of events can also be requested, if this number of events is
+	 * reached then the accumulator will be called early. The accumulator
+	 * function returns new event data to be forwarded on. It is also given the
+	 * length of time for which the buffer accumulated data. This may be less
+	 * than the requested interval if the buffer reached the maximum number of
+	 * requested events early. Note that:
+	 * <ul>
+	 * <li>The collection forwarded to the accumulator function will be empty if
+	 * no events arrived during the time interval.</li>
+	 * <li>The accumulator function will be run and the forwarded event
+	 * delivered as a different task, (and therefore potentially on a different
+	 * thread) from the one that delivered the event to this {@link PushStream}.
+	 * </li>
+	 * <li>If the window finishes by hitting the maximum number of events then
+	 * the remaining time in the window will be applied as back-pressure to the
+	 * previous stage, attempting to slow the producer to the expected windowing
+	 * threshold.</li>
+	 * </ul>
+	 * 
+	 * @param timeSupplier
+	 * @param maxEvents
+	 * @param executor
+	 * @param f
+	 * @return Builder style (can be a new or the same object)
+	 */
+	<R> PushStream<R> window(Supplier<Duration> timeSupplier,
+			IntSupplier maxEvents, Executor executor,
+			BiFunction<Long,Collection<T>,R> f);
+
+	/**
+	 * Execute the action for each event received until the channel is closed.
+	 * This is a terminating method, the returned promise is resolved when the
+	 * channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param action The action to perform
+	 * @return A promise that is resolved when the channel closes.
+	 */
+	Promise<Void> forEach(Consumer< ? super T> action);
+
+	/**
+	 * Collect the payloads in an Object array after the channel is closed. This
+	 * is a terminating method, the returned promise is resolved when the
+	 * channel is closed.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @return A promise that is resolved with all the payloads received over
+	 *         the channel
+	 */
+	Promise<Object[]> toArray();
+
+	/**
+	 * Collect the payloads in an Object array after the channel is closed. This
+	 * is a terminating method, the returned promise is resolved when the
+	 * channel is closed. The type of the array is handled by the caller using a
+	 * generator function that gets the length of the desired array.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param generator
+	 * @return A promise that is resolved with all the payloads received over
+	 *         the channel
+	 */
+	<A extends T> Promise<A[]> toArray(IntFunction<A[]> generator);
+
+	/**
+	 * Standard reduce, see Stream. The returned promise will be resolved when
+	 * the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param identity The identity/begin value
+	 * @param accumulator The accumulator
+	 * @return A
+	 */
+	Promise<T> reduce(T identity, BinaryOperator<T> accumulator);
+
+	/**
+	 * Standard reduce without identity, so the return is an Optional. The
+	 * returned promise will be resolved when the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param accumulator The accumulator
+	 * @return an Optional
+	 */
+	Promise<Optional<T>> reduce(BinaryOperator<T> accumulator);
+
+	/**
+	 * Standard reduce with identity, accumulator and combiner. The returned
+	 * promise will be resolved when the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param identity
+	 * @param accumulator
+	 * @param combiner combines to U's into one U (e.g. how combine two lists)
+	 * @return The promise
+	 */
+	<U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator,
+			BinaryOperator<U> combiner);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param collector
+	 * @return A Promise representing the collected results
+	 */
+	<R, A> Promise<R> collect(Collector< ? super T,A,R> collector);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param comparator
+	 * @return A Promise representing the minimum value, or null if no values
+	 *         are seen before the end of the stream
+	 */
+	Promise<Optional<T>> min(Comparator< ? super T> comparator);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param comparator
+	 * @return A Promise representing the maximum value, or null if no values
+	 *         are seen before the end of the stream
+	 */
+	Promise<Optional<T>> max(Comparator< ? super T> comparator);
+
+	/**
+	 * See Stream. Will resolve onces the channel closes.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @return A Promise representing the number of values in the stream
+	 */
+	Promise<Long> count();
+
+	/**
+	 * Close the channel and resolve the promise with true when the predicate
+	 * matches a payload. If the channel is closed before the predicate matches,
+	 * the promise is resolved with false.
+	 * <p>
+	 * This is a <strong>short circuiting terminal operation</strong>
+	 * 
+	 * @param predicate
+	 * @return A Promise that will resolve when an event matches the predicate,
+	 *         or the end of the stream is reached
+	 */
+	Promise<Boolean> anyMatch(Predicate< ? super T> predicate);
+
+	/**
+	 * Closes the channel and resolve the promise with false when the predicate
+	 * does not matches a pay load.If the channel is closed before, the promise
+	 * is resolved with true.
+	 * <p>
+	 * This is a <strong>short circuiting terminal operation</strong>
+	 * 
+	 * @param predicate
+	 * @return A Promise that will resolve when an event fails to match the
+	 *         predicate, or the end of the stream is reached
+	 */
+	Promise<Boolean> allMatch(Predicate< ? super T> predicate);
+
+	/**
+	 * Closes the channel and resolve the promise with false when the predicate
+	 * matches any pay load. If the channel is closed before, the promise is
+	 * resolved with true.
+	 * <p>
+	 * This is a <strong>short circuiting terminal operation</strong>
+	 * 
+	 * @param predicate
+	 * @return A Promise that will resolve when an event matches the predicate,
+	 *         or the end of the stream is reached
+	 */
+	Promise<Boolean> noneMatch(Predicate< ? super T> predicate);
+
+	/**
+	 * Close the channel and resolve the promise with the first element. If the
+	 * channel is closed before, the Optional will have no value.
+	 * 
+	 * @return a promise
+	 */
+	Promise<Optional<T>> findFirst();
+
+	/**
+	 * Close the channel and resolve the promise with the first element. If the
+	 * channel is closed before, the Optional will have no value.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @return a promise
+	 */
+	Promise<Optional<T>> findAny();
+
+	/**
+	 * Pass on each event to another consumer until the stream is closed.
+	 * <p>
+	 * This is a <strong>terminal operation</strong>
+	 * 
+	 * @param action
+	 * @return a promise
+	 */
+	Promise<Long> forEachEvent(PushEventConsumer< ? super T> action);
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * A Builder for a PushStream. This Builder extends the support of a standard
+ * BufferBuilder by allowing the PushStream to be unbuffered.
+ * 
+ *
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		extends BufferBuilder<PushStream<T>,T,U> {
+
+	/**
+	 * Tells this {@link PushStreamBuilder} to create an unbuffered stream which
+	 * delivers events directly to its consumer using the incoming delivery
+	 * thread.
+	 * 
+	 * @return the builder
+	 */
+	PushStreamBuilder<T,U> unbuffered();
+
+	/*
+	 * Overridden methods to allow the covariant return of a PushStreamBuilder
+	 */
+
+	@Override
+	PushStreamBuilder<T,U> withBuffer(U queue);
+
+	@Override
+	PushStreamBuilder<T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+	@Override
+	PushStreamBuilder<T,U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
+
+	@Override
+	PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicy<T,U> pushbackPolicy);
+
+	@Override
+	PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicyOption pushbackPolicyOption, long time);
+
+	@Override
+	PushStreamBuilder<T,U> withParallelism(int parallelism);
+
+	@Override
+	PushStreamBuilder<T,U> withExecutor(Executor executor);
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		extends AbstractBufferBuilder<PushStream<T>,T,U>
+		implements PushStreamBuilder<T,U> {
+
+	private final PushStreamProvider	psp;
+	private final PushEventSource<T>		eventSource;
+	private final Executor					previousExecutor;
+
+	private boolean							unbuffered;
+
+	PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor,
+			PushEventSource<T> eventSource) {
+		this.psp = psp;
+		this.previousExecutor = defaultExecutor;
+		this.eventSource = eventSource;
+		this.worker = defaultExecutor;
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withBuffer(U queue) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withBuffer(queue);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withQueuePolicy(
+			QueuePolicy<T,U> queuePolicy) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withQueuePolicy(
+			QueuePolicyOption queuePolicyOption) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withQueuePolicy(
+				queuePolicyOption);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicy<T,U> pushbackPolicy) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+				pushbackPolicy);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withPushbackPolicy(
+			PushbackPolicyOption pushbackPolicyOption, long time) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+				pushbackPolicyOption, time);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withParallelism(int parallelism) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withParallelism(parallelism);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> withExecutor(Executor executor) {
+		unbuffered = false;
+		return (PushStreamBuilder<T,U>) super.withExecutor(executor);
+	}
+
+	@Override
+	public PushStreamBuilder<T,U> unbuffered() {
+		unbuffered = true;
+		return this;
+	}
+
+	@Override
+	public PushStream<T> create() {
+		if (unbuffered) {
+			return psp.createUnbufferedStream(eventSource, previousExecutor);
+		} else {
+			return psp.createStream(eventSource, concurrency, worker, buffer,
+					bufferingPolicy, backPressure);
+		}
+	}
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEvent.data;
+import static org.osgi.util.pushstream.PushEvent.error;
+import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
+import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
+
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.apache.aries.pushstream.BufferedPushStreamImpl;
+import org.apache.aries.pushstream.SimplePushEventSourceImpl;
+import org.apache.aries.pushstream.UnbufferedPushStreamImpl;
+
+/**
+ * A factory for {@link PushStream} instances, and utility methods for handling
+ * {@link PushEventSource}s and {@link PushEventConsumer}s
+ */
+public final class PushStreamProvider {
+
+	private final Lock					lock	= new ReentrantLock(true);
+
+	private int							schedulerReferences;
+
+	private ScheduledExecutorService	scheduler;
+
+	private ScheduledExecutorService acquireScheduler() {
+		try {
+			lock.lockInterruptibly();
+			try {
+				schedulerReferences += 1;
+
+				if (schedulerReferences == 1) {
+					scheduler = Executors.newSingleThreadScheduledExecutor();
+				}
+				return scheduler;
+			} finally {
+				lock.unlock();
+			}
+		} catch (InterruptedException e) {
+			throw new IllegalStateException("Unable to acquire the Scheduler",
+					e);
+		}
+	}
+
+	private void releaseScheduler() {
+		try {
+			lock.lockInterruptibly();
+			try {
+				schedulerReferences -= 1;
+
+				if (schedulerReferences == 0) {
+					scheduler.shutdown();
+					scheduler = null;
+				}
+			} finally {
+				lock.unlock();
+			}
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * Create a stream with the default configured buffer, executor size, queue,
+	 * queue policy and pushback policy. This is equivalent to calling
+	 * 
+	 * <code>
+	 *   buildStream(source).create();
+	 * </code>
+	 * 
+	 * <p>
+	 * This stream will be buffered from the event producer, and will honour
+	 * back pressure even if the source does not.
+	 * 
+	 * <p>
+	 * Buffered streams are useful for "bursty" event sources which produce a
+	 * number of events close together, then none for some time. These bursts
+	 * can sometimes overwhelm downstream processors. Buffering will not,
+	 * however, protect downstream components from a source which produces
+	 * events faster (on average) than they can be consumed.
+	 * 
+	 * <p>
+	 * Event delivery will not begin until a terminal operation is reached on
+	 * the chain of AsyncStreams. Once a terminal operation is reached the
+	 * stream will be connected to the event source.
+	 * 
+	 * @param eventSource
+	 * @return A {@link PushStream} with a default initial buffer
+	 */
+	public <T> PushStream<T> createStream(PushEventSource<T> eventSource) {
+		return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32),
+				FAIL.getPolicy(), LINEAR.getPolicy(1000));
+	}
+	
+	/**
+	 * Builds a push stream with custom configuration.
+	 * 
+	 * <p>
+	 * 
+	 * The resulting {@link PushStream} may be buffered or unbuffered depending
+	 * on how it is configured.
+	 * 
+	 * @param eventSource The source of the events
+	 * 
+	 * @return A {@link PushStreamBuilder} for the stream
+	 */
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream(
+			PushEventSource<T> eventSource) {
+		return new PushStreamBuilderImpl<T,U>(this, null, eventSource);
+	}
+	
+	@SuppressWarnings({
+			"rawtypes", "unchecked"
+	})
+	<T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream(
+			PushEventSource<T> eventSource, int parallelism, Executor executor,
+			U queue, QueuePolicy<T,U> queuePolicy,
+			PushbackPolicy<T,U> pushbackPolicy) {
+
+		if (eventSource == null) {
+			throw new NullPointerException("There is no source of events");
+		}
+
+		if (parallelism < 0) {
+			throw new IllegalArgumentException(
+					"The supplied parallelism cannot be less than zero. It was "
+							+ parallelism);
+		} else if (parallelism == 0) {
+			parallelism = 1;
+		}
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(parallelism);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		if (queue == null) {
+			queue = (U) new ArrayBlockingQueue(32);
+		}
+
+		if (queuePolicy == null) {
+			queuePolicy = FAIL.getPolicy();
+		}
+
+		if (pushbackPolicy == null) {
+			pushbackPolicy = LINEAR.getPolicy(1000);
+		}
+
+		@SuppressWarnings("resource")
+		PushStream<T> stream = new BufferedPushStreamImpl<>(this,
+				acquireScheduler(), queue, parallelism, toUse, queuePolicy,
+				pushbackPolicy, aec -> {
+					try {
+						return eventSource.open(aec);
+					} catch (Exception e) {
+						throw new RuntimeException(
+								"Unable to connect to event source", e);
+					}
+				});
+
+		stream = stream.onClose(() -> {
+			if (closeExecutorOnClose) {
+				((ExecutorService) toUse).shutdown();
+			}
+			releaseScheduler();
+		}).map(Function.identity());
+		return stream;
+	}
+
+	<T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource,
+			Executor executor) {
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(2);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		@SuppressWarnings("resource")
+		PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse,
+				acquireScheduler(), aec -> {
+					try {
+						return eventSource.open(aec);
+					} catch (Exception e) {
+						throw new RuntimeException(
+								"Unable to connect to event source", e);
+					}
+				});
+
+		stream = stream.onClose(() -> {
+			if (closeExecutorOnClose) {
+				((ExecutorService) toUse).shutdown();
+			}
+			releaseScheduler();
+		}).map(Function.identity());
+
+		return stream;
+	}
+
+	/**
+	 * Convert an {@link PushStream} into an {@link PushEventSource}. The first
+	 * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
+	 * processing.
+	 * 
+	 * The {@link PushEventSource} will remain active until the backing stream
+	 * is closed, and permits multiple consumers to
+	 * {@link PushEventSource#open(PushEventConsumer)} it.
+	 * 
+	 * This is equivalent to: <code>
+	 *   buildEventSourceFromStream(stream).create();
+	 * </code>
+	 * 
+	 * @param stream
+	 * @return a {@link PushEventSource} backed by the {@link PushStream}
+	 */
+	public <T> PushEventSource<T> createEventSourceFromStream(
+			PushStream<T> stream) {
+		return buildEventSourceFromStream(stream).create();
+	}
+
+	/**
+	 * Convert an {@link PushStream} into an {@link PushEventSource}. The first
+	 * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
+	 * processing.
+	 * 
+	 * The {@link PushEventSource} will remain active until the backing stream
+	 * is closed, and permits multiple consumers to
+	 * {@link PushEventSource#open(PushEventConsumer)} it.
+	 * 
+	 * @param stream
+	 * 
+	 * @return a {@link PushEventSource} backed by the {@link PushStream}
+	 */
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream(
+			PushStream<T> stream) {
+		return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
+			@Override
+			public PushEventSource<T> create() {
+				SimplePushEventSource<T> spes = createSimplePushEventSource(
+						concurrency, worker, buffer, bufferingPolicy, () -> {
+							try {
+								stream.close();
+							} catch (Exception e) {
+								// TODO Auto-generated catch block
+								e.printStackTrace();
+							}
+						});
+				spes.connectPromise()
+						.then(p -> stream.forEach(t -> spes.publish(t))
+								.onResolve(() -> spes.close()));
+				return spes;
+			}
+		};
+	}
+	
+
+	/**
+	 * Create a {@link SimplePushEventSource} with the supplied type and default
+	 * buffering behaviours. The SimplePushEventSource will respond to back
+	 * pressure requests from the consumers connected to it.
+	 * 
+	 * This is equivalent to: <code>
+	 *   buildSimpleEventSource(type).create();
+	 * </code>
+	 * 
+	 * @param type
+	 * @return a {@link SimplePushEventSource}
+	 */
+	public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) {
+		return createSimplePushEventSource(1, null,
+				new ArrayBlockingQueue<>(32),
+				FAIL.getPolicy(), () -> { /* Nothing else to do */ });
+	}
+	
+	/**
+	 * 
+	 * Build a {@link SimplePushEventSource} with the supplied type and custom
+	 * buffering behaviours. The SimplePushEventSource will respond to back
+	 * pressure requests from the consumers connected to it.
+	 * 
+	 * @param type
+	 * 
+	 * @return a {@link SimplePushEventSource}
+	 */
+
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource(
+			Class<T> type) {
+		return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
+			@Override
+			public SimplePushEventSource<T> create() {
+				return createSimplePushEventSource(concurrency, worker, buffer,
+						bufferingPolicy, () -> { /* Nothing else to do */ });
+			}
+		};
+	}
+	
+	@SuppressWarnings({
+			"unchecked", "rawtypes"
+	})
+	<T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource(
+			int parallelism, Executor executor, U queue,
+			QueuePolicy<T,U> queuePolicy, Runnable onClose) {
+
+		if (parallelism < 0) {
+			throw new IllegalArgumentException(
+					"The supplied parallelism cannot be less than zero. It was "
+							+ parallelism);
+		} else if (parallelism == 0) {
+			parallelism = 1;
+		}
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(2);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		if (queue == null) {
+			queue = (U) new ArrayBlockingQueue(32);
+		}
+
+		if (queuePolicy == null) {
+			queuePolicy = FAIL.getPolicy();
+		}
+
+		SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
+				toUse, acquireScheduler(), queuePolicy, queue, parallelism,
+				() -> {
+					try {
+						onClose.run();
+					} catch (Exception e) {
+						// TODO log this?
+					}
+					if (closeExecutorOnClose) {
+						((ExecutorService) toUse).shutdown();
+					}
+					releaseScheduler();
+				});
+		return spes;
+	}
+
+	/**
+	 * Create a buffered {@link PushEventConsumer} with the default configured
+	 * buffer, executor size, queue, queue policy and pushback policy. This is
+	 * equivalent to calling
+	 * 
+	 * <code>
+	 *   buildBufferedConsumer(delegate).create();
+	 * </code>
+	 * 
+	 * <p>
+	 * The returned consumer will be buffered from the event source, and will
+	 * honour back pressure requests from its delegate even if the event source
+	 * does not.
+	 * 
+	 * <p>
+	 * Buffered consumers are useful for "bursty" event sources which produce a
+	 * number of events close together, then none for some time. These bursts
+	 * can sometimes overwhelm the consumer. Buffering will not, however,
+	 * protect downstream components from a source which produces events faster
+	 * than they can be consumed.
+	 * 
+	 * @param delegate
+	 * @return a {@link PushEventConsumer} with a buffer directly before it
+	 */
+	public <T> PushEventConsumer<T> createBufferedConsumer(
+			PushEventConsumer<T> delegate) {
+		return buildBufferedConsumer(delegate).create();
+	}
+	
+	/**
+	 * Build a buffered {@link PushEventConsumer} with custom configuration.
+	 * <p>
+	 * The returned consumer will be buffered from the event source, and will
+	 * honour back pressure requests from its delegate even if the event source
+	 * does not.
+	 * <p>
+	 * Buffered consumers are useful for "bursty" event sources which produce a
+	 * number of events close together, then none for some time. These bursts
+	 * can sometimes overwhelm the consumer. Buffering will not, however,
+	 * protect downstream components from a source which produces events faster
+	 * than they can be consumed.
+	 * <p>
+	 * Buffers are also useful as "circuit breakers". If a
+	 * {@link QueuePolicyOption#FAIL} is used then a full buffer will request
+	 * that the stream close, preventing an event storm from reaching the
+	 * client.
+	 * <p>
+	 * Note that this buffered consumer will close when it receives a terminal
+	 * event, or if the delegate returns negative backpressure. No further
+	 * events will be propagated after this time.
+	 * 
+	 * @param delegate
+	 * @return a {@link PushEventConsumer} with a buffer directly before it
+	 */
+	public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer(
+			PushEventConsumer<T> delegate) {
+		return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
+			@Override
+			public PushEventConsumer<T> create() {
+				PushEventPipe<T> pipe = new PushEventPipe<>();
+				
+				createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure)
+					.forEachEvent(delegate);
+				
+				return pipe;
+			}
+		};
+	}
+
+	static final class PushEventPipe<T>
+			implements PushEventConsumer<T>, PushEventSource<T> {
+
+		volatile PushEventConsumer< ? super T> delegate;
+
+		@Override
+		public AutoCloseable open(PushEventConsumer< ? super T> pec)
+				throws Exception {
+			return () -> { /* Nothing else to do */ };
+		}
+
+		@Override
+		public long accept(PushEvent< ? extends T> event) throws Exception {
+			return delegate.accept(event);
+		}
+
+	}
+
+	/**
+	 * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
+	 * data from the stream will be pushed into the PushStream synchronously as
+	 * it is opened. This may make terminal operations blocking unless a buffer
+	 * has been added to the {@link PushStream}. Care should be taken with
+	 * infinite {@link Stream}s to avoid blocking indefinitely.
+	 * 
+	 * @param items The items to push into the PushStream
+	 * @return A PushStream containing the items from the Java Stream
+	 */
+	public <T> PushStream<T> streamOf(Stream<T> items) {
+		PushEventSource<T> pes = aec -> {
+			AtomicBoolean closed = new AtomicBoolean(false);
+
+			items.mapToLong(i -> {
+				try {
+					long returnValue = closed.get() ? -1 : aec.accept(data(i));
+					if (returnValue < 0) {
+						aec.accept(PushEvent.<T>close());
+					}
+					return returnValue;
+				} catch (Exception e) {
+					try {
+						aec.accept(PushEvent.<T>error(e));
+					} catch (Exception e2) {/* No further events needed */}
+					return -1;
+				}
+			}).filter(i -> i < 0).findFirst().orElseGet(() -> {
+				try {
+					return aec.accept(PushEvent.<T>close());
+				} catch (Exception e) {
+					return -1;
+				}
+			});
+
+			return () -> closed.set(true);
+		};
+
+		return this.<T> createUnbufferedStream(pes, null);
+	}
+
+	/**
+	 * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
+	 * data from the stream will be pushed into the PushStream asynchronously
+	 * using the supplied Executor.
+	 * 
+	 * @param executor The worker to use to push items from the Stream into the
+	 *            PushStream
+	 * @param items The items to push into the PushStream
+	 * @return A PushStream containing the items from the Java Stream
+	 */
+	public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) {
+
+		boolean closeExecutorOnClose;
+		Executor toUse;
+		if (executor == null) {
+			toUse = Executors.newFixedThreadPool(2);
+			closeExecutorOnClose = true;
+		} else {
+			toUse = executor;
+			closeExecutorOnClose = false;
+		}
+
+		@SuppressWarnings("resource")
+		PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
+				this, toUse, acquireScheduler(), aec -> {
+					return () -> { /* No action to take */ };
+				}) {
+
+			@Override
+			protected boolean begin() {
+				if (super.begin()) {
+					Iterator<T> it = items.iterator();
+
+					toUse.execute(() -> pushData(it));
+
+					return true;
+				}
+				return false;
+			}
+
+			private void pushData(Iterator<T> it) {
+				while (it.hasNext()) {
+					try {
+						long returnValue = closed.get() == CLOSED ? -1
+								: handleEvent(data(it.next()));
+						if (returnValue != 0) {
+							if (returnValue < 0) {
+								close();
+								return;
+							} else {
+								scheduler.schedule(
+										() -> toUse.execute(() -> pushData(it)),
+										returnValue, MILLISECONDS);
+								return;
+							}
+						}
+					} catch (Exception e) {
+						close(error(e));
+					}
+				}
+				close();
+			}
+		};
+
+		stream = stream.onClose(() -> {
+			if (closeExecutorOnClose) {
+				((ExecutorService) toUse).shutdown();
+			}
+			releaseScheduler();
+		}).map(Function.identity());
+
+		return stream;
+	}
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * A {@link PushbackPolicy} is used to calculate how much back pressure to apply
+ * based on the current buffer. The {@link PushbackPolicy} will be called after
+ * an event has been queued, and the returned value will be used as back
+ * pressure.
+ * 
+ * @see PushbackPolicyOption
+ * 
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> {
+	
+	/**
+	 * Given the current state of the queue, determine the level of back
+	 * pressure that should be applied
+	 * 
+	 * @param queue
+	 * @return a back pressure value in nanoseconds
+	 * @throws Exception
+	 */
+	public long pushback(U queue) throws Exception;
+	
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link PushbackPolicyOption} provides a standard set of simple
+ * {@link PushbackPolicy} implementations.
+ * 
+ * @see PushbackPolicy
+ */
+public enum PushbackPolicyOption {
+
+	/**
+	 * Returns a fixed amount of back pressure, independent of how full the
+	 * buffer is
+	 */
+	FIXED {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			return q -> value;
+		}
+	},
+	/**
+	 * Returns zero back pressure until the buffer is full, then it returns a
+	 * fixed value
+	 */
+	ON_FULL_FIXED {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			return q -> q.remainingCapacity() == 0 ? value : 0;
+		}
+	},
+	/**
+	 * Returns zero back pressure until the buffer is full, then it returns an
+	 * exponentially increasing amount, starting with the supplied value and
+	 * doubling it each time. Once the buffer is no longer full the back
+	 * pressure returns to zero.
+	 */
+	ON_FULL_EXPONENTIAL {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			AtomicInteger backoffCount = new AtomicInteger(0);
+			return q -> {
+				if (q.remainingCapacity() == 0) {
+					return value << backoffCount.getAndIncrement();
+				}
+				backoffCount.set(0);
+				return 0;
+			};
+
+		}
+	},
+	/**
+	 * Returns zero back pressure when the buffer is empty, then it returns a
+	 * linearly increasing amount of back pressure based on how full the buffer
+	 * is. The maximum value will be returned when the buffer is full.
+	 */
+	LINEAR {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
+			return q -> {
+				long remainingCapacity = q.remainingCapacity();
+				long used = q.size();
+				return (value * used) / (used + remainingCapacity);
+			};
+		}
+	};
+
+	/**
+	 * Create a {@link PushbackPolicy} instance configured with a base back
+	 * pressure time in nanoseconds
+	 * 
+	 * The actual backpressure returned will vary based on the selected
+	 * implementation, the base value, and the state of the buffer.
+	 * 
+	 * @param value
+	 * @return A {@link PushbackPolicy} to use
+	 */
+	public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value);
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.osgi.annotation.versioning.ConsumerType;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+/**
+ * A {@link QueuePolicy} is used to control how events should be queued in the
+ * current buffer. The {@link QueuePolicy} will be called when an event has
+ * arrived.
+ * 
+ * @see QueuePolicyOption
+ * 
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+
+@ConsumerType
+@FunctionalInterface
+public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> { 
+	
+	/**
+	 * Enqueue the event and return the remaining capacity available for events
+	 * 
+	 * @param queue
+	 * @param event
+	 * @throws Exception If an error ocurred adding the event to the queue. This
+	 *         exception will cause the connection between the
+	 *         {@link PushEventSource} and the {@link PushEventConsumer} to be
+	 *         closed with an {@link EventType#ERROR}
+	 */
+	public void doOffer(U queue, PushEvent<? extends T> event) throws Exception;
+	
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * {@link QueuePolicyOption} provides a standard set of simple
+ * {@link QueuePolicy} implementations.
+ * 
+ * @see QueuePolicy
+ */
+public enum QueuePolicyOption {
+	/**
+	 * Attempt to add the supplied event to the queue. If the queue is unable to
+	 * immediately accept the value then discard the value at the head of the
+	 * queue and try again. Repeat this process until the event is enqueued.
+	 */
+	DISCARD_OLDEST {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+			return (queue, event) -> {
+				while (!queue.offer(event)) {
+					queue.poll();
+				}
+			};
+		}
+	},
+	/**
+	 * Attempt to add the supplied event to the queue, blocking until the
+	 * enqueue is successful.
+	 */
+	BLOCK {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+			return (queue, event) -> {
+				try {
+					queue.put(event);
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			};
+		}
+	},
+	/**
+	 * Attempt to add the supplied event to the queue, throwing an exception if
+	 * the queue is full.
+	 */
+	FAIL {
+		@Override
+		public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
+			return (queue, event) -> queue.add(event);
+		}
+	};
+
+	/**
+	 * @return a {@link QueuePolicy} implementation
+	 */
+	public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy();
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+
+/**
+ * A {@link SimplePushEventSource} is a helper that makes it simpler to write a
+ * {@link PushEventSource}. Users do not need to manage multiple registrations
+ * to the stream, nor do they have to be concerned with back pressure.
+ *
+ * @param <T> The type of the events produced by this source
+ */
+@ProviderType
+public interface SimplePushEventSource<T>
+		extends PushEventSource<T>, AutoCloseable {
+	/**
+	 * Close this source. Calling this method indicates that there will never be
+	 * any more events published by it. Calling this method sends a close event
+	 * to all connected consumers. After calling this method any
+	 * {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)}
+	 * this source will immediately receive a close event.
+	 */
+	@Override
+	void close();
+
+	/**
+	 * Asynchronously publish an event to this stream and all connected
+	 * {@link PushEventConsumer} instances. When this method returns there is no
+	 * guarantee that all consumers have been notified. Events published by a
+	 * single thread will maintain their relative ordering, however they may be
+	 * interleaved with events from other threads.
+	 * 
+	 * @param t
+	 * @throws IllegalStateException if the source is closed
+	 */
+	void publish(T t);
+
+	/**
+	 * Close this source for now, but potentially reopen it later. Calling this
+	 * method asynchronously sends a close event to all connected consumers.
+	 * After calling this method any {@link PushEventConsumer} that wishes may
+	 * {@link #open(PushEventConsumer)} this source, and will receive subsequent
+	 * events.
+	 */
+	void endOfStream();
+
+	/**
+	 * Close this source for now, but potentially reopen it later. Calling this
+	 * method asynchronously sends an error event to all connected consumers.
+	 * After calling this method any {@link PushEventConsumer} that wishes may
+	 * {@link #open(PushEventConsumer)} this source, and will receive subsequent
+	 * events.
+	 *
+	 * @param e the error
+	 */
+	void error(Exception e);
+
+	/**
+	 * Determine whether there are any {@link PushEventConsumer}s for this
+	 * {@link PushEventSource}. This can be used to skip expensive event
+	 * creation logic when there are no listeners.
+	 * 
+	 * @return true if any consumers are currently connected
+	 */
+	boolean isConnected();
+
+	/**
+	 * This method can be used to delay event generation until an event source
+	 * has connected. The returned promise will resolve as soon as one or more
+	 * {@link PushEventConsumer} instances have opened the
+	 * SimplePushEventSource.
+	 * <p>
+	 * The returned promise may already be resolved if this
+	 * {@link SimplePushEventSource} already has connected consumers. If the
+	 * {@link SimplePushEventSource} is closed before the returned Promise
+	 * resolves then it will be failed with an {@link IllegalStateException}.
+	 * <p>
+	 * Note that the connected consumers are able to asynchronously close their
+	 * connections to this {@link SimplePushEventSource}, and therefore it is
+	 * possible that once the promise resolves this
+	 * {@link SimplePushEventSource} may no longer be connected to any
+	 * consumers.
+	 * 
+	 * @return A promise representing the connection state of this EventSource
+	 */
+	Promise<Void> connectPromise();
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Push Stream Package Version 1.0.
+ * 
+ * <p>
+ * Bundles wishing to use this package must list the package in the
+ * Import-Package header of the bundle's manifest.
+ * 
+ * <p>
+ * Example import for consumers using the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"}
+ * <p>
+ * Example import for providers implementing the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"}
+ * 
+ * @author $Id: 6a28fa0b5c2036486a22a7ca1254729d7848ca43 $
+ */
+
+@Version("1.0")
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.Version;



Mime
View raw message