aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyjw...@apache.org
Subject svn commit: r1766040 [1/2] - in /aries/trunk: ./ pushstream/ pushstream/pushstream/ pushstream/pushstream/src/ pushstream/pushstream/src/main/ pushstream/pushstream/src/main/java/ pushstream/pushstream/src/main/java/org/ pushstream/pushstream/src/main/...
Date Fri, 21 Oct 2016 15:10:51 GMT
Author: timothyjward
Date: Fri Oct 21 15:10:51 2016
New Revision: 1766040

URL: http://svn.apache.org/viewvc?rev=1766040&view=rev
Log:
[pushstream] Initial contribution of Push Streams

Added:
    aries/trunk/pushstream/
    aries/trunk/pushstream/README.md
    aries/trunk/pushstream/pom.xml
    aries/trunk/pushstream/pushstream/
    aries/trunk/pushstream/pushstream/bnd.bnd
    aries/trunk/pushstream/pushstream/pom.xml
    aries/trunk/pushstream/pushstream/src/
    aries/trunk/pushstream/pushstream/src/main/
    aries/trunk/pushstream/pushstream/src/main/java/
    aries/trunk/pushstream/pushstream/src/main/java/org/
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java
Modified:
    aries/trunk/pom.xml

Modified: aries/trunk/pom.xml
URL: http://svn.apache.org/viewvc/aries/trunk/pom.xml?rev=1766040&r1=1766039&r2=1766040&view=diff
==============================================================================
--- aries/trunk/pom.xml (original)
+++ aries/trunk/pom.xml Fri Oct 21 15:10:51 2016
@@ -59,6 +59,7 @@
         <module>esa-maven-plugin</module>
         <module>async</module>
         <module>tx-control</module>
+        <module>pushstream</module>
     </modules>
 
     <build>

Added: aries/trunk/pushstream/README.md
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/README.md?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/README.md (added)
+++ aries/trunk/pushstream/README.md Fri Oct 21 15:10:51 2016
@@ -0,0 +1,9 @@
+Sample OSGi Pushstream implementation
+-------------------------------------
+
+This project is a prototype implementation of the OSGi Pushstream specification.
+
+OSGi Push Streams (RFC-216 https://github.com/osgi/design/tree/master/rfcs/rfc0216) are an in-progress RFC publicly available from the OSGi Alliance. They are also described in chapter 706 of the OSGi R7 Early Draft https://osgi.org/download/osgi.cmpn-7.0.0-earlydraft1.pdf
+
+Given that the RFC is non-final the OSGi API declared in this project is subject to change at any time up to its official release. Also the behaviour of this implementation may not always be up-to-date with the latest wording in the RFC. The project maintainers will, however try to keep pace with the RFC, and to ensure that the implementations are compliant with any OSGi specifications that result from the RFC.
+

Added: aries/trunk/pushstream/pom.xml
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pom.xml?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pom.xml (added)
+++ aries/trunk/pushstream/pom.xml Fri Oct 21 15:10:51 2016
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+     http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache</groupId>
+		<artifactId>apache</artifactId>
+		<version>17</version>
+		<relativePath />
+	</parent>
+
+	<groupId>org.apache.aries.pushstream</groupId>
+	<artifactId>parent</artifactId>
+	<version>0.0.1-SNAPSHOT</version>
+	<packaging>pom</packaging>
+	<description>Apache Aries Push Stream Parent</description>
+	<scm>
+		<connection>
+			scm:svn:http://svn.apache.org/repos/asf/aries/trunk/pushstream
+		</connection>
+		<developerConnection>
+			scm:svn:https://svn.apache.org/repos/asf/aries/trunk/pushstream
+		</developerConnection>
+		<url>
+			http://svn.apache.org/viewvc/aries/trunk/pushstream
+		</url>
+	</scm>
+
+	<profiles>
+		<profile>
+			<id>jdk18</id>
+			<activation>
+				<jdk>1.8</jdk>
+			</activation>
+			<modules>
+				<module>pushstream</module>
+			</modules>
+		</profile>
+	</profiles>
+
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>org.osgi</groupId>
+				<artifactId>osgi.annotation</artifactId>
+				<version>6.0.1</version>
+				<scope>provided</scope>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.aries.async</groupId>
+				<artifactId>org.apache.aries.async.promise.api</artifactId>
+				<version>1.0.1</version>
+			</dependency>
+			<dependency>
+				<groupId>org.slf4j</groupId>
+				<artifactId>slf4j-api</artifactId>
+				<version>1.7.0</version>
+			</dependency>
+			<dependency>
+				<groupId>junit</groupId>
+				<artifactId>junit</artifactId>
+				<version>4.11</version>
+				<scope>test</scope>
+			</dependency>
+			<dependency>
+				<groupId>org.mockito</groupId>
+				<artifactId>mockito-all</artifactId>
+				<version>1.9.5</version>
+				<scope>test</scope>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
+
+	<build>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<artifactId>maven-compiler-plugin</artifactId>
+					<configuration>
+						<source>1.8</source>
+						<target>1.8</target>
+					</configuration>
+				</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-jar-plugin</artifactId>
+					<configuration>
+						<useDefaultManifestFile>true</useDefaultManifestFile>
+					</configuration>
+				</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-javadoc-plugin</artifactId>
+					<configuration>
+						<source>1.8</source>
+					</configuration>
+				</plugin>
+				<plugin>
+					<groupId>biz.aQute.bnd</groupId>
+					<artifactId>bnd-maven-plugin</artifactId>
+					<version>3.2.0</version>
+					<executions>
+						<execution>
+							<id>default-bnd-process</id>
+							<goals>
+								<goal>bnd-process</goal>
+							</goals>
+						</execution>
+					</executions>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+</project>

Added: aries/trunk/pushstream/pushstream/bnd.bnd
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/bnd.bnd?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/bnd.bnd (added)
+++ aries/trunk/pushstream/pushstream/bnd.bnd Fri Oct 21 15:10:51 2016
@@ -0,0 +1 @@
+Export-Package: ${packages;VERSIONED}

Added: aries/trunk/pushstream/pushstream/pom.xml
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/pom.xml?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/pom.xml (added)
+++ aries/trunk/pushstream/pushstream/pom.xml Fri Oct 21 15:10:51 2016
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+     http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.aries.pushstream</groupId>
+		<artifactId>parent</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.aries.pushstream</groupId>
+	<artifactId>pushstream</artifactId>
+	<name>Apache Aries Push Streams</name>
+	<version>0.0.1-SNAPSHOT</version>
+
+
+	<description>
+        This bundle contains the Apache Aries OSGi Push Stream implementation.
+    </description>
+
+	<scm>
+		<connection>
+            scm:svn:http://svn.apache.org/repos/asf/aries/trunk/pushstream/pushstream
+        </connection>
+		<developerConnection>
+            scm:svn:https://svn.apache.org/repos/asf/aries/trunk/pushstream/pushstream
+        </developerConnection>
+		<url>
+            http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream
+        </url>
+	</scm>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.osgi</groupId>
+			<artifactId>osgi.annotation</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.aries.async</groupId>
+			<artifactId>org.apache.aries.async.promise.api</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>biz.aQute.bnd</groupId>
+				<artifactId>bnd-maven-plugin</artifactId>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

Added: aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,1374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+import static org.osgi.util.pushstream.PushEventConsumer.*;
+
+import java.time.Duration;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushEventSource;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamBuilder;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+public abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
+	
+	public static enum State {
+		BUILDING, STARTED, CLOSED
+	}
+	
+	protected final PushStreamProvider								psp;
+	
+	protected final Executor										defaultExecutor;
+	protected final ScheduledExecutorService						scheduler;
+
+	protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
+	
+	protected final AtomicReference<PushEventConsumer<T>>			next			= new AtomicReference<>();
+	
+	protected final AtomicReference<Runnable> onCloseCallback = new AtomicReference<>();
+	protected final AtomicReference<Consumer<? super Throwable>> onErrorCallback = new AtomicReference<>();
+
+	protected abstract boolean begin();
+	
+	protected AbstractPushStreamImpl(PushStreamProvider psp,
+			Executor executor, ScheduledExecutorService scheduler) {
+		this.psp = psp;
+		this.defaultExecutor = executor;
+		this.scheduler = scheduler;
+	}
+
+	protected long handleEvent(PushEvent< ? extends T> event) {
+		if(closed.get() != CLOSED) {
+			try {
+				if(event.isTerminal()) {
+					close(event.nodata());
+					return ABORT;
+				} else {
+					PushEventConsumer<T> consumer = next.get();
+					long val;
+					if(consumer == null) {
+						//TODO log a warning
+						val = CONTINUE;
+					} else {
+						val = consumer.accept(event);
+					}
+					if(val < 0) {
+						close();
+					}
+					return val;
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		}
+		return ABORT;
+	}
+	
+	@Override
+	public void close() {
+		close(PushEvent.close());
+	}
+	
+	protected boolean close(PushEvent<T> event) {
+		if(!event.isTerminal()) {
+			throw new IllegalArgumentException("The event " + event  + " is not a close event.");
+		}
+		if(closed.getAndSet(CLOSED) != CLOSED) {
+			PushEventConsumer<T> aec = next.getAndSet(null);
+			if(aec != null) {
+				try {
+					aec.accept(event);
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			Runnable handler = onCloseCallback.getAndSet(null);
+			if(handler != null) {
+				try {
+					handler.run();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			if (event.getType() == EventType.ERROR) {
+				Consumer<? super Throwable> errorHandler = onErrorCallback.getAndSet(null);
+				if(errorHandler != null) {
+					try {
+						errorHandler.accept(event.getFailure());
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				}
+			}
+			return true;
+		}
+		return false;
+	}
+	
+	@Override
+	public PushStream<T> onClose(Runnable closeHandler) {
+		if(onCloseCallback.compareAndSet(null, closeHandler)) {
+			if(closed.get() == State.CLOSED && onCloseCallback.compareAndSet(closeHandler, null)) {
+				closeHandler.run();
+			}
+		} else {
+			throw new IllegalStateException("A close handler has already been defined for this stream object");
+		}
+		return this;
+	}
+
+	@Override
+	public PushStream<T> onError(Consumer< ? super Throwable> closeHandler) {
+		if(onErrorCallback.compareAndSet(null, closeHandler)) {
+			if(closed.get() == State.CLOSED) { 
+				//TODO log already closed
+				onErrorCallback.set(null);
+			}
+		} else {
+			throw new IllegalStateException("A close handler has already been defined for this stream object");
+		}
+		return this;
+	}
+
+	private void updateNext(PushEventConsumer<T> consumer) {
+		if(!next.compareAndSet(null, consumer)) {
+			throw new IllegalStateException("This stream has already been chained");
+		} else if(closed.get() == CLOSED && next.compareAndSet(consumer, null)) {
+			try {
+				consumer.accept(PushEvent.close());
+			} catch (Exception e) {
+				//TODO log
+				e.printStackTrace();
+			}
+		}
+	}
+
+	@Override
+	public PushStream<T> filter(Predicate< ? super T> predicate) {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext((event) -> {
+			try {
+				if (!event.isTerminal()) {
+					if (predicate.test(event.getData())) {
+						return eventStream.handleEvent(event);
+					} else {
+						return CONTINUE;
+					}
+				}
+				return eventStream.handleEvent(event);
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
+		
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					return eventStream.handleEvent(
+							PushEvent.data(mapper.apply(event.getData())));
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> flatMap(
+			Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+
+		PushEventConsumer<R> consumer = e -> {
+			switch (e.getType()) {
+				case ERROR :
+					close(e.nodata());
+					return ABORT;
+				case CLOSE :
+					// Close should allow the next flat mapped entry
+					// without closing the stream;
+					return ABORT;
+				case DATA :
+					long returnValue = eventStream.handleEvent(e);
+					if (returnValue < 0) {
+						close();
+						return ABORT;
+					}
+					return returnValue;
+				default :
+					throw new IllegalArgumentException(
+							"The event type " + e.getType() + " is unknown");
+			}
+		};
+
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					PushStream< ? extends R> mappedStream = mapper
+							.apply(event.getData());
+
+					return mappedStream.forEachEvent(consumer)
+							.getValue()
+							.longValue();
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> distinct() {
+		Set<T> set = Collections.<T>newSetFromMap(new ConcurrentHashMap<>());
+		return filter(set::add);
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public PushStream<T> sorted() {
+		return sorted((Comparator)Comparator.naturalOrder());
+	}
+
+	@Override
+	public PushStream<T> sorted(Comparator< ? super T> comparator) {
+		List<T> list = Collections.synchronizedList(new ArrayList<>());
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA : 
+						list.add(event.getData());
+						return CONTINUE;
+					case CLOSE :
+						list.sort(comparator);
+						for(T t : list) {
+							eventStream.handleEvent(PushEvent.data(t));
+						}
+						return ABORT;
+					case ERROR :
+						return eventStream.handleEvent(event.nodata());
+				}
+				return eventStream.handleEvent(event.nodata());
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> limit(long maxSize) {
+		if(maxSize <= 0) {
+			throw new IllegalArgumentException("The limit must be greater than zero");
+		}
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		AtomicLong counter = new AtomicLong(maxSize);
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					long count = counter.decrementAndGet();
+					if (count > 0) {
+						return eventStream.handleEvent(event);
+					} else if (count == 0) {
+						eventStream.handleEvent(event);
+					}
+					return ABORT;
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> skip(long n) {
+		if(n <= 0) {
+			throw new IllegalArgumentException("The number to skip must be greater than zero");
+		}
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		AtomicLong counter = new AtomicLong(n);
+		updateNext(event -> {
+			try {
+				if (!event.isTerminal()) {
+					if (counter.get() > 0 && counter.decrementAndGet() >= 0) {
+						return CONTINUE;
+					} else {
+						return eventStream.handleEvent(event);
+					} 				
+				} else {
+					return eventStream.handleEvent(event.nodata());
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public PushStream<T> fork(int n, int delay, Executor ex) {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, ex, scheduler, this);
+		Semaphore s = new Semaphore(n);
+		updateNext(event -> {
+			try {
+				if (event.isTerminal()) {
+					s.acquire(n);
+					eventStream.close(event.nodata());
+					return ABORT;
+				}
+	
+				s.acquire(1);
+	
+				ex.execute(() -> {
+					try {
+						if (eventStream.handleEvent(event) < 0) {
+							eventStream.close(PushEvent.close());
+						}
+					} catch (Exception e1) {
+						close(PushEvent.error(e1));
+					} finally {
+						s.release(1);
+					}
+				});
+	
+				return s.getQueueLength() * delay;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+
+		return eventStream;
+	}
+	
+	@Override
+	public PushStream<T> buffer() {
+		return psp.createStream(c -> {
+			forEachEvent(c);
+			return this;
+		});
+	}
+
+	@Override
+	public <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer() {
+		return psp.buildStream(c -> {
+			forEachEvent(c);
+			return this;
+		});
+	}
+
+	@Override
+	public PushStream<T> merge(
+			PushEventSource< ? extends T> source) {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		AtomicInteger count = new AtomicInteger(2);
+		PushEventConsumer<T> consumer = event -> {
+			try {
+				if (!event.isTerminal()) {
+					return eventStream.handleEvent(event);
+				}
+	
+				if (count.decrementAndGet() == 0) {
+					eventStream.handleEvent(event.nodata());
+					return ABORT;
+				}
+				return CONTINUE;
+			} catch (Exception e) {
+				PushEvent<T> error = PushEvent.error(e);
+				close(error);
+				eventStream.close(event.nodata());
+				return ABORT;
+			}
+		};
+		updateNext(consumer);
+		AutoCloseable second;
+		try {
+			second = source.open((PushEvent< ? extends T> event) -> {
+				return consumer.accept(event);
+			});
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			throw new IllegalStateException(
+					"Unable to merge events as the event source could not be opened.",
+					e);
+		}
+		
+		return eventStream.onClose(() -> {
+			try {
+				second.close();
+			} catch (Exception e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} 
+		}).map(Function.identity());
+	}
+
+	@Override
+	public PushStream<T> merge(PushStream< ? extends T> source) {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		AtomicInteger count = new AtomicInteger(2);
+		PushEventConsumer<T> consumer = event -> {
+			try {
+				if (!event.isTerminal()) {
+					return eventStream.handleEvent(event);
+				}
+				
+				if (count.decrementAndGet() == 0) {
+					eventStream.handleEvent(event.nodata());
+					return ABORT;
+				}
+				return CONTINUE;
+			} catch (Exception e) {
+				PushEvent<T> error = PushEvent.error(e);
+				close(error);
+				eventStream.close(event.nodata());
+				return ABORT;
+			}
+		};
+		updateNext(consumer);
+		try {
+			source.forEachEvent(event -> {
+				return consumer.accept(event);
+			}).then(p -> {
+				count.decrementAndGet();
+				consumer.accept(PushEvent.close());
+				return null;
+			}, p -> {
+				count.decrementAndGet();
+				consumer.accept(PushEvent.error((Exception) p.getFailure()));
+			});
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			throw new IllegalStateException(
+					"Unable to merge events as the event source could not be opened.",
+					e);
+		}
+		
+		return eventStream.onClose(() -> {
+			try {
+				source.close();
+			} catch (Exception e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} 
+		}).map(Function.identity());
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public PushStream<T>[] split(Predicate< ? super T>... predicates) {
+		Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
+		AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
+		for(int i = 0; i < tests.length; i++) {
+			rsult[i] = new IntermediatePushStreamImpl<>(psp, defaultExecutor,
+					scheduler, this);
+		}
+		AtomicReferenceArray<Boolean> off = new AtomicReferenceArray<>(tests.length);
+		AtomicInteger count = new AtomicInteger(tests.length);
+		updateNext(event -> {
+			if (!event.isTerminal()) {
+				long delay = CONTINUE;
+				for (int i = 0; i < tests.length; i++) {
+					try {
+						if (off.get(i).booleanValue()
+								&& tests[i].test(event.getData())) {
+							long accept = rsult[i].handleEvent(event);
+							if (accept < 0) {
+								off.set(i, Boolean.TRUE);
+								count.decrementAndGet();
+							} else if (accept > delay) {
+								accept = delay;
+							}
+						}
+					} catch (Exception e) {
+						try {
+							rsult[i].close(PushEvent.error(e));
+						} catch (Exception e2) {
+							//TODO log
+						}
+						off.set(i, Boolean.TRUE);
+					}
+				}
+				if (count.get() == 0)
+					return ABORT;
+
+				return delay;
+			}
+			for (AbstractPushStreamImpl<T> as : rsult) {
+				try {
+					as.handleEvent(event.nodata());
+				} catch (Exception e) {
+					try {
+						as.close(PushEvent.error(e));
+					} catch (Exception e2) {
+						//TODO log
+					}
+				}
+			}
+			return ABORT;
+		});
+		return Arrays.copyOf(rsult, tests.length);
+	}
+
+	@Override
+	public PushStream<T> sequential() {
+		AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		Lock lock = new ReentrantLock();
+		updateNext((event) -> {
+			try {
+				lock.lock();
+				try {
+					return eventStream.handleEvent(event);
+				} finally {
+					lock.unlock();
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> coalesce(
+			Function< ? super T,Optional<R>> accumulator) {
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
+				psp, defaultExecutor, scheduler, this);
+		updateNext((event) -> {
+			try {
+				if (!event.isTerminal()) {
+					Optional<PushEvent<R>> coalesced = accumulator
+							.apply(event.getData()).map(PushEvent::data);
+					if (coalesced.isPresent()) {
+						try {
+							return eventStream.handleEvent(coalesced.get());
+						} catch (Exception ex) {
+							close(PushEvent.error(ex));
+							return ABORT;
+						}
+					} else {
+						return CONTINUE;
+					}
+				}
+				return eventStream.handleEvent(event.nodata());
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	@Override
+	public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f) {
+		if (count <= 0)
+			throw new IllegalArgumentException(
+					"A coalesce operation must collect a positive number of events");
+		// This could be optimised to only use a single collection queue.
+		// It would save some GC, but is it worth it?
+		return coalesce(() -> count, f);
+	}
+
+	@Override
+	public <R> PushStream<R> coalesce(IntSupplier count,
+			Function<Collection<T>,R> f) {
+		AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
+				null);
+
+		Runnable init = () -> queueRef
+				.set(getQueueForInternalBuffering(count.getAsInt()));
+
+		@SuppressWarnings("resource")
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
+				psp, defaultExecutor, scheduler, this) {
+			@Override
+			protected void beginning() {
+				init.run();
+			}
+		};
+
+		AtomicBoolean endPending = new AtomicBoolean();
+		Object lock = new Object();
+		updateNext((event) -> {
+			try {
+				Queue<T> queue;
+				if (!event.isTerminal()) {
+					synchronized (lock) {
+						for (;;) {
+							queue = queueRef.get();
+							if (queue == null) {
+								if (endPending.get()) {
+									return ABORT;
+								} else {
+									continue;
+								}
+							} else if (queue.offer(event.getData())) {
+								return CONTINUE;
+							} else {
+								queueRef.lazySet(null);
+								break;
+							}
+						}
+					}
+
+					queueRef.set(
+							getQueueForInternalBuffering(count.getAsInt()));
+
+					// This call is on the same thread and so must happen
+					// outside
+					// the synchronized block.
+					return aggregateAndForward(f, eventStream, event,
+							queue);
+				} else {
+					synchronized (lock) {
+						queue = queueRef.get();
+						queueRef.lazySet(null);
+						endPending.set(true);
+					}
+					if (queue != null) {
+						eventStream.handleEvent(
+								PushEvent.data(f.apply(queue)));
+					}
+				}
+				return eventStream.handleEvent(event.nodata());
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	private <R> long aggregateAndForward(Function<Collection<T>,R> f,
+			AbstractPushStreamImpl<R> eventStream,
+			PushEvent< ? extends T> event, Queue<T> queue) {
+		if (!queue.offer(event.getData())) {
+			((ArrayQueue<T>) queue).forcePush(event.getData());
+		}
+		return eventStream.handleEvent(PushEvent.data(f.apply(queue)));
+	}
+	
+	
+	@Override
+	public <R> PushStream<R> window(Duration time,
+			Function<Collection<T>,R> f) {
+		return window(time, defaultExecutor, f);
+	}
+
+	@Override
+	public <R> PushStream<R> window(Duration time, Executor executor,
+			Function<Collection<T>,R> f) {
+		return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c));
+	}
+
+	@Override
+	public <R> PushStream<R> window(Supplier<Duration> time,
+			IntSupplier maxEvents,
+			BiFunction<Long,Collection<T>,R> f) {
+		return window(time, maxEvents, defaultExecutor, f);
+	}
+
+	@Override
+	public <R> PushStream<R> window(Supplier<Duration> time,
+			IntSupplier maxEvents, Executor ex,
+			BiFunction<Long,Collection<T>,R> f) {
+
+		AtomicLong timestamp = new AtomicLong();
+		AtomicLong counter = new AtomicLong();
+		Object lock = new Object();
+		AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
+				null);
+
+		// This code is declared as a separate block to avoid any confusion
+		// about which instance's methods and variables are in scope
+		Consumer<AbstractPushStreamImpl<R>> begin = p -> {
+
+			synchronized (lock) {
+				timestamp.lazySet(System.nanoTime());
+				long count = counter.get();
+
+
+				scheduler.schedule(
+						getWindowTask(p, f, time, maxEvents, lock, count,
+								queueRef, timestamp, counter, ex),
+						time.get().toNanos(), NANOSECONDS);
+			}
+
+			queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+		};
+
+		@SuppressWarnings("resource")
+		AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
+				psp, ex, scheduler, this) {
+			@Override
+			protected void beginning() {
+				begin.accept(this);
+			}
+		};
+
+		AtomicBoolean endPending = new AtomicBoolean(false);
+		updateNext((event) -> {
+			try {
+				if (eventStream.closed.get() == CLOSED) {
+					return ABORT;
+				}
+				Queue<T> queue;
+				if (!event.isTerminal()) {
+					long elapsed;
+					long newCount;
+					synchronized (lock) {
+						for (;;) {
+							queue = queueRef.get();
+							if (queue == null) {
+								if (endPending.get()) {
+									return ABORT;
+								} else {
+									continue;
+								}
+							} else if (queue.offer(event.getData())) {
+								return CONTINUE;
+							} else {
+								queueRef.lazySet(null);
+								break;
+							}
+						}
+
+						long now = System.nanoTime();
+						elapsed = now - timestamp.get();
+						timestamp.lazySet(now);
+						newCount = counter.get() + 1;
+						counter.lazySet(newCount);
+
+						// This is a non-blocking call, and must happen in the
+						// synchronized block to avoid re=ordering the executor
+						// enqueue with a subsequent incoming close operation
+						aggregateAndForward(f, eventStream, event, queue,
+								ex, elapsed);
+					}
+					// These must happen outside the synchronized block as we
+					// call out to user code
+					queueRef.set(
+							getQueueForInternalBuffering(maxEvents.getAsInt()));
+					scheduler.schedule(
+							getWindowTask(eventStream, f, time, maxEvents, lock,
+									newCount, queueRef, timestamp, counter, ex),
+							time.get().toNanos(), NANOSECONDS);
+
+					return CONTINUE;
+				} else {
+					long elapsed;
+					synchronized (lock) {
+						queue = queueRef.get();
+						queueRef.lazySet(null);
+						endPending.set(true);
+						long now = System.nanoTime();
+						elapsed = now - timestamp.get();
+						counter.lazySet(counter.get() + 1);
+					}
+					Collection<T> collected = queue == null ? emptyList()
+							: queue;
+					ex.execute(() -> {
+						try {
+							eventStream
+									.handleEvent(PushEvent.data(f.apply(
+											Long.valueOf(NANOSECONDS
+													.toMillis(elapsed)),
+											collected)));
+						} catch (Exception e) {
+							close(PushEvent.error(e));
+						}
+					});
+				}
+				ex.execute(() -> eventStream.handleEvent(event.nodata()));
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		return eventStream;
+	}
+
+	protected Queue<T> getQueueForInternalBuffering(int size) {
+		if (size == 0) {
+			return new LinkedList<T>();
+		} else {
+			return new ArrayQueue<>(size - 1);
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	/**
+	 * A special queue that keeps one element in reserve and can have that last
+	 * element set using forcePush. After the element is set the capacity is
+	 * permanently increased by one and cannot grow further.
+	 * 
+	 * @param <E> The element type
+	 */
+	private static class ArrayQueue<E> extends AbstractQueue<E>
+			implements Queue<E> {
+
+		final Object[]	store;
+
+		int				normalLength;
+
+		int				nextIndex;
+
+		int				size;
+
+		ArrayQueue(int capacity) {
+			store = new Object[capacity + 1];
+			normalLength = store.length - 1;
+		}
+
+		@Override
+		public boolean offer(E e) {
+			if (e == null)
+				throw new NullPointerException("Null values are not supported");
+			if (size < normalLength) {
+				store[nextIndex] = e;
+				size++;
+				nextIndex++;
+				nextIndex = nextIndex % normalLength;
+				return true;
+			}
+			return false;
+		}
+
+		public void forcePush(E e) {
+			store[normalLength] = e;
+			normalLength++;
+			size++;
+		}
+
+		@Override
+		public E poll() {
+			if (size == 0) {
+				return null;
+			} else {
+				int idx = nextIndex - size;
+				if (idx < 0) {
+					idx += normalLength;
+				}
+				E value = (E) store[idx];
+				store[idx] = null;
+				size--;
+				return value;
+			}
+		}
+
+		@Override
+		public E peek() {
+			if (size == 0) {
+				return null;
+			} else {
+				int idx = nextIndex - size;
+				if (idx < 0) {
+					idx += normalLength;
+				}
+				return (E) store[idx];
+			}
+		}
+
+		@Override
+		public Iterator<E> iterator() {
+			final int previousNext = nextIndex;
+			return new Iterator<E>() {
+
+				int idx;
+
+				int	remaining	= size;
+
+				{
+					idx = nextIndex - size;
+					if (idx < 0) {
+						idx += normalLength;
+					}
+				}
+
+				@Override
+				public boolean hasNext() {
+					if (nextIndex != previousNext) {
+						throw new ConcurrentModificationException(
+								"The queue was concurrently modified");
+					}
+					return remaining > 0;
+				}
+
+				@Override
+				public E next() {
+					if (!hasNext()) {
+						throw new NoSuchElementException(
+								"The iterator has no more values");
+					}
+					E value = (E) store[idx];
+					idx++;
+					remaining--;
+					if (idx == normalLength) {
+						idx = 0;
+					}
+					return value;
+				}
+
+			};
+		}
+
+		@Override
+		public int size() {
+			return size;
+		}
+
+	}
+
+	private <R> Runnable getWindowTask(AbstractPushStreamImpl<R> eventStream,
+			BiFunction<Long,Collection<T>,R> f, Supplier<Duration> time,
+			IntSupplier maxEvents, Object lock, long expectedCounter,
+			AtomicReference<Queue<T>> queueRef, AtomicLong timestamp,
+			AtomicLong counter, Executor executor) {
+		return () -> {
+
+			Queue<T> queue = null;
+			long elapsed;
+			synchronized (lock) {
+				
+				if (counter.get() != expectedCounter) {
+					return;
+				}
+				counter.lazySet(expectedCounter + 1);
+
+				long now = System.nanoTime();
+				elapsed = now - timestamp.get();
+				timestamp.lazySet(now);
+
+				queue = queueRef.get();
+				queueRef.lazySet(null);
+
+				// This is a non-blocking call, and must happen in the
+				// synchronized block to avoid re=ordering the executor
+				// enqueue with a subsequent incoming close operation
+
+				Collection<T> collected = queue == null ? emptyList() : queue;
+				executor.execute(() -> {
+					try {
+						eventStream.handleEvent(PushEvent.data(f.apply(
+								Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+								collected)));
+					} catch (Exception e) {
+						close(PushEvent.error(e));
+					}
+				});
+			}
+
+			// These must happen outside the synchronized block as we
+			// call out to user code
+			queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+			scheduler.schedule(
+					getWindowTask(eventStream, f, time, maxEvents, lock,
+							expectedCounter + 1, queueRef, timestamp, counter,
+							executor),
+					time.get().toNanos(), NANOSECONDS);
+		};
+	}
+
+	private <R> void aggregateAndForward(BiFunction<Long,Collection<T>,R> f,
+			AbstractPushStreamImpl<R> eventStream,
+			PushEvent< ? extends T> event, Queue<T> queue, Executor executor,
+			long elapsed) {
+		executor.execute(() -> {
+			try {
+				if (!queue.offer(event.getData())) {
+					((ArrayQueue<T>) queue).forcePush(event.getData());
+				}
+				long result = eventStream.handleEvent(PushEvent.data(
+						f.apply(Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+								queue)));
+				if (result < 0) {
+					close();
+				}
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+			}
+		});
+	}
+
+	@Override
+	public Promise<Void> forEach(Consumer< ? super T> action) {
+		Deferred<Void> d = new Deferred<>();
+		updateNext((event) -> {
+				try {
+					switch(event.getType()) {
+						case DATA:
+							action.accept(event.getData());
+							return CONTINUE;
+						case CLOSE:
+							d.resolve(null);
+							break;
+						case ERROR:
+							d.fail(event.getFailure());
+							break;
+					}
+					close(event.nodata());
+					return ABORT;
+				} catch (Exception e) {
+					d.fail(e);
+					return ABORT;
+				}
+			});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Object[]> toArray() {
+		return collect(Collectors.toList())
+				.map(List::toArray);
+	}
+
+	@Override
+	public <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator) {
+		return collect(Collectors.toList())
+				.map(l -> l.toArray(generator.apply(l.size())));
+	}
+
+	@Override
+	public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
+		Deferred<T> d = new Deferred<>();
+		AtomicReference<T> iden = new AtomicReference<T>(identity);
+
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						iden.accumulateAndGet(event.getData(), accumulator);
+						return CONTINUE;
+					case CLOSE:
+						d.resolve(iden.get());
+						break;
+					case ERROR:
+						d.fail(event.getFailure());
+						break;
+				}
+				close(event.nodata());
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
+		Deferred<Optional<T>> d = new Deferred<>();
+		AtomicReference<T> iden = new AtomicReference<T>(null);
+
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						if (!iden.compareAndSet(null, event.getData()))
+							iden.accumulateAndGet(event.getData(), accumulator);
+						return CONTINUE;
+					case CLOSE:
+						d.resolve(Optional.ofNullable(iden.get()));
+						break;
+					case ERROR:
+						d.fail(event.getFailure());
+						break;
+				}
+				close(event.nodata());
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
+		Deferred<U> d = new Deferred<>();
+		AtomicReference<U> iden = new AtomicReference<>(identity);
+
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						iden.updateAndGet((e) -> accumulator.apply(e, event.getData()));
+						return CONTINUE;
+					case CLOSE:
+						d.resolve(iden.get());
+						break;
+					case ERROR:
+						d.fail(event.getFailure());
+						break;
+				}
+				close(event.nodata());
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
+		A result = collector.supplier().get();
+		Deferred<R> d = new Deferred<>();
+		updateNext(event -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						collector.accumulator().accept(result, event.getData());
+						return CONTINUE;
+					case CLOSE:
+						d.resolve(collector.finisher().apply(result));
+						break;
+					case ERROR:
+						d.fail(event.getFailure());
+						break;
+				}
+				close(event.nodata());
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Optional<T>> min(Comparator<? super T> comparator)  {
+		return reduce((a, b) -> comparator.compare(a, b) <= 0 ? a : b);
+	}
+
+	@Override
+	public Promise<Optional<T>> max(Comparator<? super T> comparator) {
+		return reduce((a, b) -> comparator.compare(a, b) > 0 ? a : b);
+	}
+
+	@Override
+	public Promise<Long> count() {
+		Deferred<Long> d = new Deferred<>();
+		LongAdder counter = new LongAdder();
+		updateNext((event) -> {
+				try {
+					switch(event.getType()) {
+						case DATA:
+						counter.add(1);
+							return CONTINUE;
+						case CLOSE:
+						d.resolve(Long.valueOf(counter.sum()));
+							break;
+						case ERROR:
+							d.fail(event.getFailure());
+							break;
+					}
+					close(event.nodata());
+					return ABORT;
+				} catch (Exception e) {
+				close(PushEvent.error(e));
+					return ABORT;
+				}
+			});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Boolean> anyMatch(Predicate<? super T> predicate) {
+		return filter(predicate).findAny()
+			.map(Optional::isPresent);
+	}
+
+	@Override
+	public Promise<Boolean> allMatch(Predicate<? super T> predicate) {
+		return filter(x -> !predicate.test(x)).findAny()
+				.map(o -> Boolean.valueOf(!o.isPresent()));
+	}
+
+	@Override
+	public Promise<Boolean> noneMatch(Predicate<? super T> predicate) {
+		return filter(predicate).findAny()
+				.map(o -> Boolean.valueOf(!o.isPresent()));
+	}
+
+	@Override
+	public Promise<Optional<T>> findFirst() {
+		Deferred<Optional<T>> d = new Deferred<>();
+		updateNext((event) -> {
+				try {
+					Optional<T> o = null;
+					switch(event.getType()) {
+						case DATA:
+							o = Optional.of(event.getData());
+							break;
+						case CLOSE:
+							o = Optional.empty();
+							break;
+						case ERROR:
+							d.fail(event.getFailure());
+							return ABORT;
+					}
+					if(!d.getPromise().isDone())
+						d.resolve(o);
+					return ABORT;
+				} catch (Exception e) {
+				close(PushEvent.error(e));
+					return ABORT;
+				}
+			});
+		begin();
+		return d.getPromise();
+	}
+
+	@Override
+	public Promise<Optional<T>> findAny() {
+		return findFirst();
+	}
+
+	@Override
+	public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
+		Deferred<Long> d = new Deferred<>();
+		LongAdder la = new LongAdder();
+		updateNext((event) -> {
+			try {
+				switch(event.getType()) {
+					case DATA:
+						long value = action.accept(event);
+						la.add(value);
+						return value;
+					case CLOSE:
+						try {
+							action.accept(event);
+						} finally {
+							d.resolve(Long.valueOf(la.sum()));
+						}
+						break;
+					case ERROR:
+						try {
+							action.accept(event);
+						} finally {
+							d.fail(event.getFailure());
+						}
+						break;
+				}
+				return ABORT;
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+				return ABORT;
+			}
+		});
+		begin();
+		return d.getPromise();
+	}
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.PushbackPolicy;
+import org.osgi.util.pushstream.QueuePolicy;
+
+public class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
+	
+	private final U eventQueue;
+	
+	private final Semaphore semaphore;
+	
+	private final Executor worker;
+	
+	private final QueuePolicy<T, U> queuePolicy;
+
+	private final PushbackPolicy<T, U> pushbackPolicy;
+	
+	/**
+	 * Indicates that a terminal event has been received, that we should stop
+	 * collecting new events, and that we must drain the buffer before
+	 * continuing
+	 */
+	private final AtomicBoolean			softClose	= new AtomicBoolean();
+
+	private final int					parallelism;
+
+	public BufferedPushStreamImpl(PushStreamProvider psp,
+			ScheduledExecutorService scheduler, U eventQueue,
+			int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy,
+			PushbackPolicy<T,U> pushbackPolicy,
+			Function<PushEventConsumer<T>,AutoCloseable> connector) {
+		super(psp, worker, scheduler, connector);
+		this.eventQueue = eventQueue;
+		this.parallelism = parallelism;
+		this.semaphore = new Semaphore(parallelism);
+		this.worker = worker;
+		this.queuePolicy = queuePolicy;
+		this.pushbackPolicy = pushbackPolicy;
+	}
+
+	@Override
+	protected long handleEvent(PushEvent< ? extends T> event) {
+
+		// If we have already been soft closed, or hard closed then abort
+		if (!softClose.compareAndSet(false, event.isTerminal())
+				|| closed.get() == CLOSED) {
+			return ABORT;
+		}
+
+		try {
+			queuePolicy.doOffer(eventQueue, event);
+			long backPressure = pushbackPolicy.pushback(eventQueue);
+			if(backPressure < 0) {
+				close();
+				return ABORT;
+			}
+			if(semaphore.tryAcquire()) {
+				startWorker();
+			}
+			return backPressure;
+		} catch (Exception e) {
+			close(PushEvent.error(e));
+			return ABORT;
+		}
+	}
+
+	private void startWorker() {
+		worker.execute(() -> {
+			try {
+				PushEvent< ? extends T> event;
+				while ((event = eventQueue.poll()) != null) {
+					if (event.isTerminal()) {
+						// Wait for the other threads to finish
+						semaphore.acquire(parallelism - 1);
+					}
+
+					long backpressure = super.handleEvent(event);
+					if(backpressure < 0) {
+						close();
+						return;
+					} else if(backpressure > 0) {
+						scheduler.schedule(this::startWorker, backpressure,
+								MILLISECONDS);
+						return;
+					}
+				}
+
+				semaphore.release();
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+			}
+			if(eventQueue.peek() != null && semaphore.tryAcquire()) {
+				try {
+					startWorker();
+				} catch (Exception e) {
+					close(PushEvent.error(e));
+				}
+			}
+		});
+		
+	}
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+
+public class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
+		implements PushStream<T> {
+	
+	private final AbstractPushStreamImpl< ? > previous;
+	
+	protected IntermediatePushStreamImpl(PushStreamProvider psp,
+			Executor executor, ScheduledExecutorService scheduler,
+			AbstractPushStreamImpl< ? > previous) {
+		super(psp, executor, scheduler);
+		this.previous = previous;
+	}
+
+	@Override
+	protected boolean begin() {
+		if(closed.compareAndSet(BUILDING, STARTED)) {
+			beginning();
+			previous.begin();
+			return true;
+		}
+		return false;
+	}
+
+	protected void beginning() {
+		// The base implementation has nothing to do, but
+		// this method is used in windowing
+	}
+	
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.Promises;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.QueuePolicy;
+import org.osgi.util.pushstream.SimplePushEventSource;
+
+public class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		implements SimplePushEventSource<T> {
+
+	private final Object								lock		= new Object();
+
+	private final Executor								worker;
+
+	private final ScheduledExecutorService				scheduler;
+
+	private final QueuePolicy<T,U>						queuePolicy;
+
+	private final U										queue;
+
+	private final int									parallelism;
+
+	private final Semaphore								semaphore;
+
+	private final List<PushEventConsumer< ? super T>>	connected	= new ArrayList<>();
+
+	private final Runnable								onClose;
+
+	private boolean										closed;
+	
+	private Deferred<Void>								connectPromise;
+
+	private boolean										waitForFinishes;
+
+
+	public SimplePushEventSourceImpl(Executor worker,
+			ScheduledExecutorService scheduler, QueuePolicy<T,U> queuePolicy,
+			U queue, int parallelism, Runnable onClose) {
+		this.worker = worker;
+		this.scheduler = scheduler;
+		this.queuePolicy = queuePolicy;
+		this.queue = queue;
+		this.parallelism = parallelism;
+		this.semaphore = new Semaphore(parallelism);
+		this.onClose = onClose;
+		this.closed = false;
+		this.connectPromise = null;
+	}
+
+	@Override
+	public AutoCloseable open(PushEventConsumer< ? super T> pec)
+			throws Exception {
+		Deferred<Void> toResolve = null;
+		synchronized (lock) {
+			if (closed) {
+				throw new IllegalStateException(
+						"This PushEventConsumer is closed");
+			}
+
+			toResolve = connectPromise;
+			connectPromise = null;
+
+			connected.add(pec);
+		}
+
+		if (toResolve != null) {
+			toResolve.resolve(null);
+		}
+
+		return () -> {
+			closeConsumer(pec, PushEvent.close());
+		};
+	}
+
+	private void closeConsumer(PushEventConsumer< ? super T> pec,
+			PushEvent<T> event) {
+		boolean sendClose;
+		synchronized (lock) {
+			sendClose = connected.remove(pec);
+		}
+		if (sendClose) {
+			doSend(pec, event);
+		}
+	}
+
+	private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+		try {
+			worker.execute(() -> safePush(pec, event));
+		} catch (RejectedExecutionException ree) {
+			// TODO log?
+			if (!event.isTerminal()) {
+				close(PushEvent.error(ree));
+			} else {
+				safePush(pec, event);
+			}
+		}
+	}
+
+	@SuppressWarnings("boxing")
+	private Promise<Long> doSendWithBackPressure(
+			PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+		Deferred<Long> d = new Deferred<>();
+		try {
+			worker.execute(
+					() -> d.resolve(System.nanoTime() + safePush(pec, event)));
+		} catch (RejectedExecutionException ree) {
+			// TODO log?
+			if (!event.isTerminal()) {
+				close(PushEvent.error(ree));
+				return Promises.resolved(System.nanoTime());
+			} else {
+				return Promises
+						.resolved(System.nanoTime() + safePush(pec, event));
+			}
+		}
+		return d.getPromise();
+	}
+
+	private long safePush(PushEventConsumer< ? super T> pec,
+			PushEvent<T> event) {
+		try {
+			long backpressure = pec.accept(event) * 1000000;
+			if (backpressure < 0 && !event.isTerminal()) {
+				closeConsumer(pec, PushEvent.close());
+				return -1;
+			}
+			return backpressure;
+		} catch (Exception e) {
+			// TODO log?
+			if (!event.isTerminal()) {
+				closeConsumer(pec, PushEvent.error(e));
+			}
+			return -1;
+		}
+	}
+
+	@Override
+	public void close() {
+		close(PushEvent.close());
+	}
+
+	private void close(PushEvent<T> event) {
+		List<PushEventConsumer< ? super T>> toClose;
+		Deferred<Void> toFail = null;
+		synchronized (lock) {
+			if(!closed) {
+				closed = true;
+				
+				toClose = new ArrayList<>(connected);
+				connected.clear();
+				queue.clear();
+
+				if(connectPromise != null) {
+					toFail = connectPromise;
+					connectPromise = null;
+				}
+			} else {
+				toClose = emptyList();
+			}
+		}
+
+		toClose.stream().forEach(pec -> doSend(pec, event));
+
+		if (toFail != null) {
+			toFail.resolveWith(closedConnectPromise());
+		}
+
+		onClose.run();
+	}
+
+	@Override
+	public void publish(T t) {
+		enqueueEvent(PushEvent.data(t));
+	}
+
+	@Override
+	public void endOfStream() {
+		enqueueEvent(PushEvent.close());
+	}
+
+	@Override
+	public void error(Exception e) {
+		enqueueEvent(PushEvent.error(e));
+	}
+
+	private void enqueueEvent(PushEvent<T> event) {
+		synchronized (lock) {
+			if (closed || connected.isEmpty()) {
+				return;
+			}
+		}
+
+		try {
+			queuePolicy.doOffer(queue, event);
+			boolean start;
+			synchronized (lock) {
+				start = !waitForFinishes && semaphore.tryAcquire();
+			}
+			if (start) {
+				startWorker();
+			}
+		} catch (Exception e) {
+			close(PushEvent.error(e));
+			throw new IllegalStateException(
+					"The queue policy threw an exception", e);
+		}
+	}
+
+	@SuppressWarnings({
+			"unchecked", "boxing"
+	})
+	private void startWorker() {
+		worker.execute(() -> {
+			try {
+				
+				for(;;) {
+					PushEvent<T> event;
+					List<PushEventConsumer< ? super T>> toCall;
+					boolean resetWait = false;
+					synchronized (lock) {
+						if(waitForFinishes) {
+							semaphore.release();
+							while(waitForFinishes) {
+								lock.notifyAll();
+								lock.wait();
+							}
+							semaphore.acquire();
+						}
+
+						event = (PushEvent<T>) queue.poll();
+						
+						if(event == null) {
+							break;
+						}
+
+						toCall = new ArrayList<>(connected);
+						if (event.isTerminal()) {
+							waitForFinishes = true;
+							resetWait = true;
+							connected.clear();
+							while (!semaphore.tryAcquire(parallelism - 1)) {
+								lock.wait();
+							}
+						}
+					}
+					
+					List<Promise<Long>> calls = toCall.stream().map(pec -> {
+						if (semaphore.tryAcquire()) {
+							try {
+								return doSendWithBackPressure(pec, event);
+							} finally {
+								semaphore.release();
+							}
+						} else {
+							return Promises.resolved(
+									System.nanoTime() + safePush(pec, event));
+						}
+					}).collect(toList());
+
+					long toWait = Promises.<Long,Long>all(calls)
+							.map(l -> l.stream()
+									.max((a,b) -> a.compareTo(b))
+										.orElseGet(() -> System.nanoTime()))
+							.getValue() - System.nanoTime();
+					
+					
+					if (toWait > 0) {
+						scheduler.schedule(this::startWorker, toWait,
+								NANOSECONDS);
+						return;
+					}
+
+					if (resetWait == true) {
+						synchronized (lock) {
+							waitForFinishes = false;
+							lock.notifyAll();
+						}
+					}
+				}
+
+				semaphore.release();
+			} catch (Exception e) {
+				close(PushEvent.error(e));
+			}
+			if (queue.peek() != null && semaphore.tryAcquire()) {
+				try {
+					startWorker();
+				} catch (Exception e) {
+					close(PushEvent.error(e));
+				}
+			}
+		});
+
+	}
+
+	@Override
+	public boolean isConnected() {
+		synchronized (lock) {
+			return !connected.isEmpty();
+		}
+	}
+
+	@Override
+	public Promise<Void> connectPromise() {
+		synchronized (lock) {
+			if (closed) {
+				return closedConnectPromise();
+			}
+
+			if (connected.isEmpty()) {
+				if (connectPromise == null) {
+					connectPromise = new Deferred<>();
+				}
+				return connectPromise.getPromise();
+			} else {
+				return Promises.resolved(null);
+			}
+		}
+	}
+
+	private Promise<Void> closedConnectPromise() {
+		return Promises.failed(new IllegalStateException(
+				"This SimplePushEventSource is closed"));
+	}
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+
+public class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
+	extends AbstractPushStreamImpl<T> implements PushStream<T> {
+	
+	protected final Function<PushEventConsumer<T>,AutoCloseable>	connector;
+	
+	protected final AtomicReference<AutoCloseable>					upstream	= new AtomicReference<AutoCloseable>();
+	
+	public UnbufferedPushStreamImpl(PushStreamProvider psp,
+			Executor executor, ScheduledExecutorService scheduler,
+			Function<PushEventConsumer<T>,AutoCloseable> connector) {
+		super(psp, executor, scheduler);
+		this.connector = connector;
+	}
+
+	@Override
+	protected boolean close(PushEvent<T> event) {
+		if(super.close(event)) {
+			ofNullable(upstream.getAndSet(() -> {
+				// This block doesn't need to do anything, but the presence
+				// of the Closable is needed to prevent duplicate begins
+			})).ifPresent(c -> {
+					try {
+						c.close();
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				});
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	protected boolean begin() {
+		if(closed.compareAndSet(BUILDING, STARTED)) {
+			AutoCloseable toClose = connector.apply(this::handleEvent);
+			if(!upstream.compareAndSet(null,toClose)) {
+				//TODO log that we tried to connect twice...
+				try {
+					toClose.close();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+
+			if (closed.get() == CLOSED
+					&& upstream.compareAndSet(toClose, null)) {
+				// We closed before setting the upstream - close it now
+				try {
+					toClose.close();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			return true;
+		}
+		return false;
+	}
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? extends T>>>
+		implements BufferBuilder<R,T,U> {
+
+	protected Executor				worker;
+	protected int					concurrency;
+	protected PushbackPolicy<T,U>	backPressure;
+	protected QueuePolicy<T,U>		bufferingPolicy;
+	protected U						buffer;
+
+	@Override
+	public BufferBuilder<R,T,U> withBuffer(U queue) {
+		this.buffer = queue;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withQueuePolicy(
+			QueuePolicy<T,U> queuePolicy) {
+		this.bufferingPolicy = queuePolicy;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withQueuePolicy(
+			QueuePolicyOption queuePolicyOption) {
+		this.bufferingPolicy = queuePolicyOption.getPolicy();
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withPushbackPolicy(
+			PushbackPolicy<T,U> pushbackPolicy) {
+		this.backPressure = pushbackPolicy;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withPushbackPolicy(
+			PushbackPolicyOption pushbackPolicyOption, long time) {
+		this.backPressure = pushbackPolicyOption.getPolicy(time);
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withParallelism(int parallelism) {
+		this.concurrency = parallelism;
+		return this;
+	}
+
+	@Override
+	public BufferBuilder<R,T,U> withExecutor(Executor executor) {
+		this.worker = executor;
+		return this;
+	}
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * Create a buffered section of a Push-based stream
+ *
+ * @param <R> The type of object being built
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> {
+
+	/**
+	 * The BlockingQueue implementation to use as a buffer
+	 * 
+	 * @param queue
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withBuffer(U queue);
+
+	/**
+	 * Set the {@link QueuePolicy} of this Builder
+	 * 
+	 * @param queuePolicy
+	 * @return this builder
+	 */
+	BufferBuilder<R,T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+	/**
+	 * Set the {@link QueuePolicy} of this Builder
+	 * 
+	 * @param queuePolicyOption
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
+
+	/**
+	 * Set the {@link PushbackPolicy} of this builder
+	 * 
+	 * @param pushbackPolicy
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy);
+
+	/**
+	 * Set the {@link PushbackPolicy} of this builder
+	 * 
+	 * @param pushbackPolicyOption
+	 * @param time
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long time);
+
+	/**
+	 * Set the maximum permitted number of concurrent event deliveries allowed
+	 * from this buffer
+	 * 
+	 * @param parallelism
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withParallelism(int parallelism);
+
+	/**
+	 * Set the {@link Executor} that should be used to deliver events from this
+	 * buffer
+	 * 
+	 * @param executor
+	 * @return this builder
+	 */
+	BufferBuilder<R, T, U> withExecutor(Executor executor);
+	
+	/**
+	 * @return the object being built
+	 */
+	R create();
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static org.osgi.util.pushstream.PushEvent.EventType.*;
+
+/**
+ * A PushEvent is an immutable object that is transferred through a
+ * communication channel to push information to a downstream consumer. The event
+ * has three different types:
+ * <ul>
+ * <li>{@link EventType#DATA} – Provides access to a typed data element in the
+ * stream.
+ * <li>{@link EventType#CLOSE} – The stream is closed. After receiving this
+ * event, no more events will follow.
+ * <li>{@link EventType#ERROR} – The stream ran into an unrecoverable problem
+ * and is sending the reason downstream. The stream is closed and no more events
+ * will follow after this event.
+ * </ul>
+ *
+ * @param <T> The payload type of the event.
+ * @Immutable
+ */
+public abstract class PushEvent<T> {
+
+	/**
+	 * The type of a {@link PushEvent}.
+	 */
+	public static enum EventType {
+		/**
+		 * A data event forming part of the stream
+		 */
+		DATA,
+		/**
+		 * An error event that indicates streaming has failed and that no more
+		 * events will arrive
+		 */
+		ERROR,
+		/**
+		 * An event that indicates that the stream has terminated normally
+		 */
+		CLOSE
+	}
+
+	/**
+	 * Package private default constructor.
+	 */
+	PushEvent() {}
+
+	/**
+	 * Get the type of this event.
+	 * 
+	 * @return The type of this event.
+	 */
+	public abstract EventType getType();
+
+	/**
+	 * Return the data for this event.
+	 * 
+	 * @return The data payload.
+	 * @throws IllegalStateException if this event is not a
+	 *             {@link EventType#DATA} event.
+	 */
+	public T getData() throws IllegalStateException {
+		throw new IllegalStateException(
+				"Not a DATA event, the event type is " + getType());
+	}
+
+	/**
+	 * Return the error that terminated the stream.
+	 * 
+	 * @return The error that terminated the stream.
+	 * @throws IllegalStateException if this event is not an
+	 *             {@link EventType#ERROR} event.
+	 */
+	public Exception getFailure() throws IllegalStateException {
+		throw new IllegalStateException(
+				"Not an ERROR event, the event type is " + getType());
+	}
+
+	/**
+	 * Answer if no more events will follow after this event.
+	 * 
+	 * @return {@code false} if this is a data event, otherwise {@code true}.
+	 */
+	public boolean isTerminal() {
+		return true;
+	}
+
+	/**
+	 * Create a new data event.
+	 * 
+	 * @param <T> The payload type.
+	 * @param payload The payload.
+	 * @return A new data event wrapping the specified payload.
+	 */
+	public static <T> PushEvent<T> data(T payload) {
+		return new DataEvent<T>(payload);
+	}
+
+	/**
+	 * Create a new error event.
+	 * 
+	 * @param <T> The payload type.
+	 * @param e The error.
+	 * @return A new error event with the specified error.
+	 */
+	public static <T> PushEvent<T> error(Exception e) {
+		return new ErrorEvent<T>(e);
+	}
+
+	/**
+	 * Create a new close event.
+	 * 
+	 * @param <T> The payload type.
+	 * @return A new close event.
+	 */
+	public static <T> PushEvent<T> close() {
+		return new CloseEvent<T>();
+	}
+
+	/**
+	 * Convenience to cast a close/error event to another payload type. Since
+	 * the payload type is not needed for these events this is harmless. This
+	 * therefore allows you to forward the close/error event downstream without
+	 * creating anew event.
+	 * 
+	 * @param <X> The new payload type.
+	 * @return The current error or close event mapped to a new payload type.
+	 * @throws IllegalStateException if the event is a {@link EventType#DATA}
+	 *             event.
+	 */
+	public <X> PushEvent<X> nodata() throws IllegalStateException {
+		@SuppressWarnings("unchecked")
+		PushEvent<X> result = (PushEvent<X>) this;
+		return result;
+	}
+
+	static final class DataEvent<T> extends PushEvent<T> {
+		private final T data;
+
+		DataEvent(T data) {
+			this.data = data;
+		}
+
+		@Override
+		public T getData() throws IllegalStateException {
+			return data;
+		}
+
+		@Override
+		public EventType getType() {
+			return DATA;
+		}
+
+		@Override
+		public boolean isTerminal() {
+			return false;
+		}
+
+		@Override
+		public <X> PushEvent<X> nodata() throws IllegalStateException {
+			throw new IllegalStateException("This event is a DATA event");
+		}
+	}
+
+	static final class ErrorEvent<T> extends PushEvent<T> {
+		private final Exception error;
+
+		ErrorEvent(Exception error) {
+			this.error = error;
+		}
+
+		@Override
+		public Exception getFailure() {
+			return error;
+		}
+
+		@Override
+		public EventType getType() {
+			return ERROR;
+		}
+	}
+
+	static final class CloseEvent<T> extends PushEvent<T> {
+		@Override
+		public EventType getType() {
+			return CLOSE;
+		}
+	}
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An Async Event Consumer asynchronously receives Data events until it receives
+ * either a Close or Error event.
+ * 
+ * @param <T>
+ *            The type for the event payload
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventConsumer<T> {
+
+	/**
+	 * If ABORT is used as return value, the sender should close the channel all
+	 * the way to the upstream source. The ABORT will not guarantee that no
+	 * more events are delivered since this is impossible in a concurrent
+	 * environment. The consumer should accept subsequent events and close/clean
+	 * up when the Close or Error event is received.
+	 * 
+	 * Though ABORT has the value -1, any value less than 0 will act as an
+	 * abort.
+	 */
+	long	ABORT		= -1;
+
+	/**
+	 * A 0 indicates that the consumer is willing to receive subsequent events
+	 * at full speeds.
+	 * 
+	 * Any value more than 0 will indicate that the consumer is becoming
+	 * overloaded and wants a delay of the given milliseconds before the next
+	 * event is sent. This allows the consumer to pushback the event delivery
+	 * speed.
+	 */
+	long	CONTINUE	= 0;
+
+	/**
+	 * Accept an event from a source. Events can be delivered on multiple
+	 * threads simultaneously. However, Close and Error events are the last
+	 * events received, no more events must be sent after them.
+	 * 
+	 * @param event The event
+	 * @return less than 0 means abort, 0 means continue, more than 0 means
+	 *         delay ms
+	 * @throws Exception to indicate that an error has occured and that no
+	 *         further events should be delivered to this
+	 *         {@link PushEventConsumer}
+	 */
+	long accept(PushEvent<? extends T> event) throws Exception;
+
+}

Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java (added)
+++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java Fri Oct 21 15:10:51 2016
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An event source. An event source can open a channel between a source and a
+ * consumer. Once the channel is opened (even before it returns) the source can
+ * send events to the consumer.
+ *
+ * A source should stop sending and automatically close the channel when sending
+ * an event returns a negative value, see {@link PushEventConsumer#ABORT}.
+ * Values that are larger than 0 should be treated as a request to delay the
+ * next events with those number of milliseconds.
+ * 
+ * @param <T>
+ *            The payload type
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventSource<T> {
+
+	/**
+	 * Open the asynchronous channel between the source and the consumer. The
+	 * call returns an {@link AutoCloseable}. This can be closed, and should
+	 * close the channel, including sending a Close event if the channel was not
+	 * already closed. The returned object must be able to be closed multiple
+	 * times without sending more than one Close events.
+	 * 
+	 * @param aec the consumer (not null)
+	 * @return a {@link AutoCloseable} that can be used to close the stream
+	 * @throws Exception
+	 */
+	AutoCloseable open(PushEventConsumer< ? super T> aec) throws Exception;
+}




Mime
View raw message