flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
Date Tue, 02 Feb 2016 14:04:58 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
new file mode 100644
index 0000000..e1c0099
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -0,0 +1,858 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import com.google.common.collect.LinkedHashMultimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * A shared buffer implementation which stores values under a key. Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in the buffer.
+ * <p>
+ * The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each
+ * buffer page maintains a collection of the inserted values.
+ *
+ * The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store
+ * relations between different entries. A dewey versioning scheme allows to discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
+ *
+ * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ *
+ * @param <K> Type of the keys
+ * @param <V> Type of the values
+ */
+public class SharedBuffer<K extends Serializable, V> implements Serializable {
+	private static final long serialVersionUID = 9213251042562206495L;
+
+	private final TypeSerializer<V> valueSerializer;
+
+	private transient Map<K, SharedBufferPage<K, V>> pages;
+
+	public SharedBuffer(final TypeSerializer<V> valueSerializer) {
+		this.valueSerializer = valueSerializer;
+		pages = new HashMap<>();
+	}
+
+	/**
+	 * Stores given value (value + timestamp) under the given key. It assigns a preceding element
+	 * relation to the entry which is defined by the previous key, value (value + timestamp).
+	 *
+	 * @param key Key of the current value
+	 * @param value Current value
+	 * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
+	 * @param previousKey Key of the value for the previous relation
+	 * @param previousValue Value for the previous relation
+	 * @param previousTimestamp Timestamp of the value for the previous relation
+	 * @param version Version of the previous relation
+	 */
+	public void put(
+			final K key,
+			final V value,
+			final long timestamp,
+			final K previousKey,
+			final V previousValue,
+			final long previousTimestamp,
+			final DeweyNumber version) {
+		SharedBufferPage<K, V> page;
+
+		if (!pages.containsKey(key)) {
+			page = new SharedBufferPage<K, V>(key);
+			pages.put(key, page);
+		} else {
+			page = pages.get(key);
+		}
+
+		final SharedBufferEntry<K, V> previousSharedBufferEntry = get(previousKey, previousValue, previousTimestamp);
+
+		page.add(
+			new ValueTimeWrapper<>(value, timestamp),
+			previousSharedBufferEntry,
+			version);
+	}
+
+	/**
+	 * Checks whether the given key, value, timestamp triple is contained in the shared buffer
+	 *
+	 * @param key Key of the value
+	 * @param value Value
+	 * @param timestamp Timestamp of the value
+	 * @return Whether a value with the given timestamp is registered under the given key
+	 */
+	public boolean contains(
+		final K key,
+		final V value,
+		final long timestamp) {
+
+		return pages.containsKey(key) && pages.get(key).contains(new ValueTimeWrapper<>(value, timestamp));
+	}
+
+	public boolean isEmpty() {
+		for (SharedBufferPage<K, V> page: pages.values()) {
+			if (!page.isEmpty()) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	/**
+	 * Deletes all entries in each page which have expired with respect to given pruning timestamp.
+	 *
+	 * @param pruningTimestamp The time which is used for pruning. All elements whose timestamp is
+	 *                         lower than the pruning timestamp will be removed.
+	 */
+	public void prune(long pruningTimestamp) {
+		Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator();
+
+		while (iter.hasNext()) {
+			SharedBufferPage<K, V> page = iter.next().getValue();
+
+			page.prune(pruningTimestamp);
+
+			if (page.isEmpty()) {
+				// delete page if it is empty
+				iter.remove();
+			}
+		}
+	}
+
+	/**
+	 * Returns all elements from the previous relation starting at the given value with the
+	 * given key and timestamp.
+	 *
+	 * @param key Key of the starting value
+	 * @param value Value of the starting element
+	 * @param timestamp Timestamp of the starting value
+	 * @param version Version of the previous relation which shall be extracted
+	 * @return Collection of previous relations starting with the given value
+	 */
+	public Collection<LinkedHashMultimap<K, V>> extractPatterns(
+		final K key,
+		final V value,
+		final long timestamp,
+		final DeweyNumber version) {
+		Collection<LinkedHashMultimap<K, V>> result = new ArrayList<>();
+
+		// stack to remember the current extraction states
+		Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
+
+		// get the starting shared buffer entry for the previous relation
+		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+		if (entry != null) {
+			extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
+
+			// use a depth first search to reconstruct the previous relations
+			while (!extractionStates.isEmpty()) {
+				ExtractionState<K, V> extractionState = extractionStates.pop();
+				DeweyNumber currentVersion = extractionState.getVersion();
+				// current path of the depth first search
+				Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+
+				// termination criterion
+				if (currentVersion.length() == 1) {
+					LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
+
+					while(!currentPath.isEmpty()) {
+						SharedBufferEntry<K, V> currentEntry = currentPath.pop();
+
+						completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
+					}
+
+					result.add(completePath);
+				} else {
+					SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
+
+					// append state to the path
+					currentPath.push(currentEntry);
+
+					boolean firstMatch = true;
+					for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
+						// we can only proceed if the current version is compatible to the version
+						// of this previous relation
+						if (currentVersion.isCompatibleWith(edge.getVersion())) {
+							if (firstMatch) {
+								// for the first match we don't have to copy the current path
+								extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath));
+								firstMatch = false;
+							} else {
+								Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
+								copy.addAll(currentPath);
+
+								extractionStates.push(
+									new ExtractionState<K, V>(
+										edge.getTarget(),
+										edge.getVersion(),
+										copy));
+							}
+						}
+					}
+				}
+			}
+		}
+
+		return result;
+	}
+
+	/**
+	 * Increases the reference counter for the given value, key, timestamp entry so that it is not
+	 * accidentally removed.
+	 *
+	 * @param key Key of the value to lock
+	 * @param value Value to lock
+	 * @param timestamp Timestamp of the value to lock
+	 */
+	public void lock(final K key, final V value, final long timestamp) {
+		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+		if (entry != null) {
+			entry.increaseReferenceCounter();
+		}
+	}
+
+	/**
+	 * Decreases the reference counter for the given value, key, timstamp entry so that it can be
+	 * removed once the reference counter reaches 0.
+	 *
+	 * @param key Key of the value to release
+	 * @param value Value to release
+	 * @param timestamp Timestamp of the value to release
+	 */
+	public void release(final K key, final V value, final long timestamp) {
+		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+		if (entry != null ) {
+			entry.decreaseReferenceCounter();
+		}
+	}
+
+	/**
+	 * Removes the given value, key, timestamp entry if its reference counter is 0. It will also
+	 * release the next element in its previous relation and apply remove to this element
+	 * recursively.
+	 *
+	 * @param key Key of the value to remove
+	 * @param value Value to remove
+	 * @param timestamp Timestamp of the value to remvoe
+	 */
+	public void remove(final K key, final V value, final long timestamp) {
+		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+		if (entry != null) {
+			internalRemove(entry);
+		}
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		DataOutputViewStreamWrapper target = new DataOutputViewStreamWrapper(oos);
+		Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
+		int totalEdges = 0;
+		int entryCounter = 0;
+
+		oos.defaultWriteObject();
+
+		// number of pages
+		oos.writeInt(pages.size());
+
+		for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
+			SharedBufferPage<K, V> page = pageEntry.getValue();
+
+			// key for the current page
+			oos.writeObject(page.getKey());
+			// number of page entries
+			oos.writeInt(page.entries.size());
+
+			for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+				// serialize the sharedBufferEntry
+				SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
+
+				// assign id to the sharedBufferEntry for the future serialization of the previous
+				// relation
+				entryIDs.put(sharedBuffer, entryCounter++);
+
+				ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
+
+				valueSerializer.serialize(valueTimeWrapper.value, target);
+				oos.writeLong(valueTimeWrapper.getTimestamp());
+
+				int edges = sharedBuffer.edges.size();
+				totalEdges += edges;
+
+				oos.writeInt(sharedBuffer.referenceCounter);
+			}
+		}
+
+		// write the edges between the shared buffer entries
+		oos.writeInt(totalEdges);
+
+		for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
+			SharedBufferPage<K, V> page = pageEntry.getValue();
+
+			for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+				SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
+
+				if (!entryIDs.containsKey(sharedBuffer)) {
+					throw new RuntimeException("Could not find id for entry: " + sharedBuffer);
+				} else {
+					int id = entryIDs.get(sharedBuffer);
+
+					for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
+						// in order to serialize the previous relation we simply serialize the ids
+						// of the source and target SharedBufferEntry
+						if (edge.target != null) {
+							if (!entryIDs.containsKey(edge.getTarget())) {
+								throw new RuntimeException("Could not find id for entry: " + edge.getTarget());
+							} else {
+								int targetId = entryIDs.get(edge.getTarget());
+
+								oos.writeInt(id);
+								oos.writeInt(targetId);
+								oos.writeObject(edge.version);
+							}
+						} else {
+							oos.writeInt(id);
+							oos.writeInt(-1);
+							oos.writeObject(edge.version);
+						}
+					}
+				}
+			}
+		}
+	}
+
+	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+		DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois);
+		ArrayList<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
+		ois.defaultReadObject();
+
+		this.pages = new HashMap<>();
+
+		int numberPages = ois.readInt();
+
+		for (int i = 0; i < numberPages; i++) {
+			// key of the page
+			@SuppressWarnings("unchecked")
+			K key = (K)ois.readObject();
+
+			SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
+
+			pages.put(key, page);
+
+			int numberEntries = ois.readInt();
+
+			for (int j = 0; j < numberEntries; j++) {
+				// restore the SharedBufferEntries for the given page
+				V value = valueSerializer.deserialize(source);
+				long timestamp = ois.readLong();
+
+				ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp);
+				SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page);
+
+				sharedBufferEntry.referenceCounter = ois.readInt();
+
+				page.entries.put(valueTimeWrapper, sharedBufferEntry);
+
+				entryList.add(sharedBufferEntry);
+			}
+		}
+
+		// read the edges of the shared buffer entries
+		int numberEdges = ois.readInt();
+
+		for (int j = 0; j < numberEdges; j++) {
+			int sourceIndex = ois.readInt();
+			int targetIndex = ois.readInt();
+
+			if (sourceIndex >= entryList.size() || sourceIndex < 0) {
+				throw new RuntimeException("Could not find source entry with index " + sourceIndex +
+					". This indicates a corrupted state.");
+			} else {
+				// We've already deserialized the shared buffer entry. Simply read its ID and
+				// retrieve the buffer entry from the list of entries
+				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex);
+
+				final DeweyNumber version = (DeweyNumber) ois.readObject();
+				final SharedBufferEntry<K, V> target;
+
+				if (targetIndex >= 0) {
+					if (targetIndex >= entryList.size()) {
+						throw new RuntimeException("Could not find target entry with index " + targetIndex +
+							". This indicates a corrupted state.");
+					} else {
+						target = entryList.get(targetIndex);
+					}
+				} else {
+					target = null;
+				}
+
+				sourceEntry.edges.add(new SharedBufferEdge<K, V>(target, version));
+			}
+		}
+	}
+
+	private SharedBufferEntry<K, V> get(
+		final K key,
+		final V value,
+		final long timestamp) {
+		if (pages.containsKey(key)) {
+			return pages
+				.get(key)
+				.get(new ValueTimeWrapper<V>(value, timestamp));
+		} else {
+			return null;
+		}
+	}
+
+	private void internalRemove(final SharedBufferEntry<K, V> entry) {
+		Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
+		entriesToRemove.add(entry);
+
+		while (!entriesToRemove.isEmpty()) {
+			SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
+
+			if (currentEntry.getReferenceCounter() == 0) {
+				currentEntry.remove();
+
+				for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) {
+					if (edge.getTarget() != null) {
+						edge.getTarget().decreaseReferenceCounter();
+						entriesToRemove.push(edge.getTarget());
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+
+		for(Map.Entry<K, SharedBufferPage<K, V>> entry :pages.entrySet()){
+			builder.append("Key: ").append(entry.getKey()).append("\n");
+			builder.append("Value: ").append(entry.getValue()).append("\n");
+		}
+
+		return builder.toString();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof SharedBuffer) {
+			@SuppressWarnings("unchecked")
+			SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj;
+
+			return pages.equals(other.pages) && valueSerializer.equals(other.valueSerializer);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(pages, valueSerializer);
+	}
+
+	/**
+	 * The SharedBufferPage represents a set of elements which have been stored under the same key.
+	 *
+	 * @param <K> Type of the key
+	 * @param <V> Type of the value
+	 */
+	private static class SharedBufferPage<K, V> {
+
+		// key of the page
+		private final K key;
+
+		// Map of entries which are stored in this page
+		private final HashMap<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entries;
+
+		public SharedBufferPage(final K key) {
+			this.key = key;
+			entries = new HashMap<>();
+		}
+
+		public K getKey() {
+			return key;
+		}
+
+		/**
+		 * Adds a new value time pair to the page. The new entry is linked to the previous entry
+		 * with the given version.
+		 *
+		 * @param valueTime Value time pair to be stored
+		 * @param previous Previous shared buffer entry to which the new entry shall be linked
+		 * @param version Version of the relation between the new and the previous entry
+		 */
+		public void add(final ValueTimeWrapper<V> valueTime, final SharedBufferEntry<K, V> previous, final DeweyNumber version) {
+			SharedBufferEntry<K, V> sharedBufferEntry = entries.get(valueTime);
+
+			if (sharedBufferEntry == null) {
+				sharedBufferEntry = new SharedBufferEntry<K, V>(valueTime, this);
+
+				entries.put(valueTime, sharedBufferEntry);
+			}
+
+			SharedBufferEdge<K, V> newEdge;
+
+			if (previous != null) {
+				newEdge = new SharedBufferEdge<>(previous, version);
+				previous.increaseReferenceCounter();
+			} else {
+				newEdge = new SharedBufferEdge<>(null, version);
+			}
+
+			sharedBufferEntry.addEdge(newEdge);
+		}
+
+		public boolean contains(final ValueTimeWrapper<V> valueTime) {
+			return entries.containsKey(valueTime);
+		}
+
+		public SharedBufferEntry<K, V> get(final ValueTimeWrapper<V> valueTime) {
+			return entries.get(valueTime);
+		}
+
+		/**
+		 * Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
+		 *
+		 * @param pruningTimestamp Timestamp for the pruning
+		 */
+		public void prune(long pruningTimestamp) {
+			Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator();
+			boolean continuePruning = true;
+
+			while (iterator.hasNext() && continuePruning) {
+				SharedBufferEntry<K, V> entry = iterator.next().getValue();
+
+				if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
+					iterator.remove();
+				} else {
+					continuePruning = false;
+				}
+			}
+		}
+
+		public boolean isEmpty() {
+			return entries.isEmpty();
+		}
+
+		public SharedBufferEntry<K, V> remove(final ValueTimeWrapper<V> valueTime) {
+			return entries.remove(valueTime);
+		}
+
+		@Override
+		public String toString() {
+			StringBuilder builder = new StringBuilder();
+
+			builder.append("SharedBufferPage(\n");
+
+			for (SharedBufferEntry<K, V> entry: entries.values()) {
+				builder.append(entry.toString()).append("\n");
+			}
+
+			builder.append(")");
+
+			return builder.toString();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof SharedBufferPage) {
+				@SuppressWarnings("unchecked")
+				SharedBufferPage<K, V> other = (SharedBufferPage<K, V>) obj;
+
+				return key.equals(other.key) && entries.equals(other.entries);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(key, entries);
+		}
+	}
+
+	/**
+	 * Entry of a {@link SharedBufferPage}. The entry contains the value timestamp pair, a set of
+	 * edges to other shared buffer entries denoting a relation, a reference to the owning page and
+	 * a reference counter. The reference counter counts how many references are kept to this entry.
+	 *
+	 * @param <K> Type of the key
+	 * @param <V> Type of the value
+	 */
+	private static class SharedBufferEntry<K, V> {
+		private final ValueTimeWrapper<V> valueTime;
+		private final Set<SharedBufferEdge<K, V>> edges;
+		private final SharedBufferPage<K, V> page;
+		private int referenceCounter;
+
+		public SharedBufferEntry(
+			final ValueTimeWrapper<V> valueTime,
+			final SharedBufferPage<K, V> page) {
+			this(valueTime, null, page);
+		}
+
+		public SharedBufferEntry(
+			final ValueTimeWrapper<V> valueTime,
+			final SharedBufferEdge<K, V> edge,
+			final SharedBufferPage<K, V> page) {
+			this.valueTime = valueTime;
+			edges = new HashSet<>();
+
+			if (edge != null) {
+				edges.add(edge);
+			}
+
+			referenceCounter = 0;
+
+			this.page = page;
+		}
+
+		public ValueTimeWrapper<V> getValueTime() {
+			return valueTime;
+		}
+
+		public Collection<SharedBufferEdge<K, V>> getEdges() {
+			return edges;
+		}
+
+		public K getKey() {
+			return page.getKey();
+		}
+
+		public void addEdge(SharedBufferEdge<K, V> edge) {
+			edges.add(edge);
+		}
+
+		public boolean remove() {
+			if (page != null) {
+				page.remove(valueTime);
+
+				return true;
+			} else {
+				return false;
+			}
+		}
+
+		public void increaseReferenceCounter() {
+			referenceCounter++;
+		}
+
+		public void decreaseReferenceCounter() {
+			if (referenceCounter > 0) {
+				referenceCounter--;
+			}
+		}
+
+		public int getReferenceCounter() {
+			return referenceCounter;
+		}
+
+		@Override
+		public String toString() {
+			return "SharedBufferEntry(" + valueTime + ", [" + StringUtils.join(edges, ", ") + "], " + referenceCounter + ")";
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof SharedBufferEntry) {
+				@SuppressWarnings("unchecked")
+				SharedBufferEntry<K, V> other = (SharedBufferEntry<K, V>) obj;
+
+				return valueTime.equals(other.valueTime) &&
+					getKey().equals(other.getKey()) &&
+					referenceCounter == other.referenceCounter &&
+					edges.equals(other.edges);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(valueTime, getKey(), referenceCounter, edges);
+		}
+	}
+
+	/**
+	 * Versioned edge between two shared buffer entries
+	 *
+	 * @param <K> Type of the key
+	 * @param <V> Type of the value
+	 */
+	public static class SharedBufferEdge<K, V> {
+		private final SharedBufferEntry<K, V> target;
+		private final DeweyNumber version;
+
+		public SharedBufferEdge(final SharedBufferEntry<K, V> target, final DeweyNumber version) {
+			this.target = target;
+			this.version = version;
+		}
+
+		public SharedBufferEntry<K, V> getTarget() {
+			return target;
+		}
+
+		public DeweyNumber getVersion() {
+			return version;
+		}
+
+		@Override
+		public String toString() {
+			return "SharedBufferEdge(" + target + ", " + version + ")";
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof SharedBufferEdge) {
+				@SuppressWarnings("unchecked")
+				SharedBufferEdge<K, V> other = (SharedBufferEdge<K, V>) obj;
+
+				if (version.equals(other.version)) {
+					if (target == null && other.target == null) {
+						return true;
+					} else if (target != null && other.target != null) {
+						return target.getKey().equals(other.target.getKey()) &&
+							target.getValueTime().equals(other.target.getValueTime());
+					} else {
+						return false;
+					}
+				} else {
+					return false;
+				}
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			if (target != null) {
+				return Objects.hash(target.getKey(), target.getValueTime(), version);
+			} else {
+				return version.hashCode();
+			}
+		}
+	}
+
+	/**
+	 * Wrapper for a value timestamp pair.
+	 *
+	 * @param <V> Type of the value
+	 */
+	static class ValueTimeWrapper<V> {
+		private final V value;
+		private final long timestamp;
+
+		public ValueTimeWrapper(final V value, final long timestamp) {
+			this.value = value;
+			this.timestamp = timestamp;
+		}
+
+		public V getValue() {
+			return value;
+		}
+
+		public long getTimestamp() {
+			return timestamp;
+		}
+
+		@Override
+		public String toString() {
+			return "ValueTimeWrapper(" + value + ", " + timestamp + ")";
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof ValueTimeWrapper) {
+				@SuppressWarnings("unchecked")
+				ValueTimeWrapper<V> other = (ValueTimeWrapper<V>)obj;
+
+				return timestamp == other.getTimestamp() && value.equals(other.getValue());
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return (int) (this.timestamp ^ this.timestamp >>> 32) + 31 * value.hashCode();
+		}
+	}
+
+	/**
+	 * Helper class to store the extraction state while extracting a sequence of values following
+	 * the versioned entry edges.
+	 *
+	 * @param <K> Type of the key
+	 * @param <V> Type of the value
+	 */
+	private static class ExtractionState<K, V> {
+		private final SharedBufferEntry<K, V> entry;
+		private final DeweyNumber version;
+		private final Stack<SharedBufferEntry<K, V>> path;
+
+		public ExtractionState(
+			final SharedBufferEntry<K, V> entry,
+			final DeweyNumber version,
+			final Stack<SharedBufferEntry<K, V>> path) {
+
+			this.entry = entry;
+			this.version = version;
+			this.path = path;
+		}
+
+		public SharedBufferEntry<K, V> getEntry() {
+			return entry;
+		}
+
+		public DeweyNumber getVersion() {
+			return version;
+		}
+
+		public Stack<SharedBufferEntry<K, V>> getPath() {
+			return path;
+		}
+
+		@Override
+		public String toString() {
+			return "ExtractionState(" + entry + ", " + version + ", [" +  StringUtils.join(path, ", ") + "])";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
new file mode 100644
index 0000000..50b2cf3
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Represents a state of the {@link NFA}.
+ * <p>
+ * Each state is identified by a name and a state type. Furthermore, it contains a collection of
+ * state transitions. The state transitions describe under which conditions it is possible to enter
+ * a new state.
+ *
+ * @param <T> Type of the input events
+ */
+public class State<T> implements Serializable {
+	private static final long serialVersionUID = 6658700025989097781L;
+
+	private final String name;
+	private final StateType stateType;
+	private final Collection<StateTransition<T>> stateTransitions;
+
+	public State(final String name, final StateType stateType) {
+		this.name = name;
+		this.stateType = stateType;
+
+		stateTransitions = new ArrayList<StateTransition<T>>();
+	}
+
+	public boolean isFinal() {
+		return stateType == StateType.Final;
+	}
+
+	public boolean isStart() { return stateType == StateType.Start; }
+
+	public String getName() {
+		return name;
+	}
+
+	public Collection<StateTransition<T>> getStateTransitions() {
+		return stateTransitions;
+	}
+
+	public void addStateTransition(final StateTransition<T> stateTransition) {
+		stateTransitions.add(stateTransition);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof State) {
+			@SuppressWarnings("unchecked")
+			State<T> other = (State<T>)obj;
+
+			return name.equals(other.name) &&
+				stateType == other.stateType &&
+				stateTransitions.equals(other.stateTransitions);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+
+		builder.append("State(").append(name).append(", ").append(stateType).append(", [\n");
+
+		for (StateTransition<T> stateTransition: stateTransitions) {
+			builder.append(stateTransition).append(",\n");
+		}
+
+		builder.append("])");
+
+		return builder.toString();
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(name, stateType, stateTransitions);
+	}
+
+	/**
+	 * Set of valid state types.
+	 */
+	public enum StateType {
+		Start, // the state is a starting state for the NFA
+		Final, // the state is a final state for the NFA
+		Normal // the state is neither a start nor a final state
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
new file mode 100644
index 0000000..479f28a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class StateTransition<T> implements Serializable {
+	private static final long serialVersionUID = -4825345749997891838L;
+
+	private final StateTransitionAction action;
+	private final State<T> targetState;
+	private final FilterFunction<T> condition;
+
+	public StateTransition(final StateTransitionAction action, final State<T> targetState, final FilterFunction<T> condition) {
+		this.action = action;
+		this.targetState = targetState;
+		this.condition = condition;
+	}
+
+	public StateTransitionAction getAction() {
+		return action;
+	}
+
+	public State<T> getTargetState() {
+		return targetState;
+	}
+
+	public FilterFunction<T> getCondition() {
+		return condition;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof StateTransition) {
+			@SuppressWarnings("unchecked")
+			StateTransition<T> other = (StateTransition<T>) obj;
+
+			return action == other.action &&
+				targetState.getName().equals(other.targetState.getName());
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		// we have to take the name of targetState because the transition might be reflexive
+		return Objects.hash(action, targetState.getName());
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+
+		builder.append("StateTransition(").append(action).append(", ").append(targetState.getName());
+
+		if (condition != null) {
+			builder.append(", with filter)");
+		} else {
+			builder.append(")");
+		}
+
+		return builder.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
new file mode 100644
index 0000000..70fc7fb
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+/**
+ * Set of actions when doing a state transition from a {@link State} to another.
+ */
+public enum StateTransitionAction {
+	TAKE, // take the current event and assign it to the new state
+	IGNORE, // ignore the current event and do the state transition
+	PROCEED // do the state transition and keep the current event for further processing (epsilon transition)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
new file mode 100644
index 0000000..f2561d4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.FollowedByPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
+ * {@link NFAFactory}.
+ */
+public class NFACompiler {
+
+	protected final static String BEGINNING_STATE_NAME = "$beginningState$";
+
+	/**
+	 * Compiles the given pattern into a {@link NFA}.
+	 *
+	 * @param pattern Definition of sequence pattern
+	 * @param inputTypeSerializer Serializer for the input type
+	 * @param <T> Type of the input events
+	 * @return Non-deterministic finite automaton representing the given pattern
+	 */
+	public static <T> NFA<T> compile(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) {
+		NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer);
+
+		return factory.createNFA();
+	}
+
+	/**
+	 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
+	 * multiple NFAs.
+	 *
+	 * @param pattern Definition of sequence pattern
+	 * @param inputTypeSerializer Serializer for the input type
+	 * @param <T> Type of the input events
+	 * @return Factory for NFAs corresponding to the given pattern
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> NFAFactory<T> compileFactory(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) {
+		if (pattern == null) {
+			// return a factory for empty NFAs
+			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList());
+		} else {
+			// set of all generated states
+			Map<String, State<T>> states = new HashMap<>();
+			long windowTime;
+
+			Pattern<T, ?> succeedingPattern;
+			State<T> succeedingState;
+			Pattern<T, ?> currentPattern = pattern;
+
+			// we're traversing the pattern from the end to the beginning --> the first state is the final state
+			State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
+
+			states.put(currentPattern.getName(), currentState);
+
+			windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
+
+			while (currentPattern.getPrevious() != null) {
+				succeedingPattern = currentPattern;
+				succeedingState = currentState;
+				currentPattern = currentPattern.getPrevious();
+
+				Time currentWindowTime = currentPattern.getWindowTime();
+
+				if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
+					// the window time is the global minimum of all window times of each state
+					windowTime = currentWindowTime.toMilliseconds();
+				}
+
+				if (states.containsKey(currentPattern.getName())) {
+					currentState = states.get(currentPattern.getName());
+				} else {
+					currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					states.put(currentState.getName(), currentState);
+				}
+
+				currentState.addStateTransition(new StateTransition<T>(
+					StateTransitionAction.TAKE,
+					succeedingState,
+					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
+
+				if (succeedingPattern instanceof FollowedByPattern) {
+					// the followed by pattern entails a reflexive ignore transition
+					currentState.addStateTransition(new StateTransition<T>(
+						StateTransitionAction.IGNORE,
+						currentState,
+						null
+					));
+				}
+			}
+
+			// add the beginning state
+			final State<T> beginningState;
+
+			if (states.containsKey(BEGINNING_STATE_NAME)) {
+				beginningState = states.get(BEGINNING_STATE_NAME);
+			} else {
+				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
+				states.put(BEGINNING_STATE_NAME, beginningState);
+			}
+
+			beginningState.addStateTransition(new StateTransition<T>(
+				StateTransitionAction.TAKE,
+				currentState,
+				(FilterFunction<T>) currentPattern.getFilterFunction()
+			));
+
+			NFA<T> nfa = new NFA<T>(inputTypeSerializer, windowTime);
+			nfa.addStates(states.values());
+
+			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()));
+		}
+	}
+
+	/**
+	 * Factory interface for {@link NFA}.
+	 *
+	 * @param <T> Type of the input events which are processed by the NFA
+	 */
+	public interface NFAFactory<T> extends Serializable {
+		NFA<T> createNFA();
+	}
+
+	/**
+	 * Implementation of the {@link NFAFactory} interface.
+	 * <p>
+	 * The implementation takes the input type serializer, the window time and the set of
+	 * states and their transitions to be able to create an NFA from them.
+	 *
+	 * @param <T> Type of the input events which are processed by the NFA
+	 */
+	private static class NFAFactoryImpl<T> implements NFAFactory<T> {
+
+		private static final long serialVersionUID = 8939783698296714379L;
+
+		private final TypeSerializer<T> inputTypeSerializer;
+		private final long windowTime;
+		private final Collection<State<T>> states;
+
+		private NFAFactoryImpl(TypeSerializer<T> inputTypeSerializer, long windowTime, Collection<State<T>> states) {
+			this.inputTypeSerializer = inputTypeSerializer;
+			this.windowTime = windowTime;
+			this.states = states;
+		}
+
+		@Override
+		public NFA<T> createNFA() {
+			NFA<T> result =  new NFA<>(inputTypeSerializer.duplicate(), windowTime);
+
+			result.addStates(states);
+
+			return result;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
new file mode 100644
index 0000000..a943f0d
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+/**
+ * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event
+ * patterns. The detected event patterns are then outputted to the down stream operators.
+ *
+ * @param <IN> Type of the input elements
+ */
+public abstract class AbstractCEPPatternOperator<IN>
+	extends AbstractStreamOperator<Map<String, IN>>
+	implements OneInputStreamOperator<IN, Map<String, IN>> {
+
+	private static final long serialVersionUID = -4166778210774160757L;
+
+	protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+	private final TypeSerializer<IN> inputSerializer;
+	private final boolean isProcessingTime;
+
+	public AbstractCEPPatternOperator(
+			final TypeSerializer<IN> inputSerializer,
+			final boolean isProcessingTime) {
+		this.inputSerializer = inputSerializer;
+		this.isProcessingTime = isProcessingTime;
+	}
+
+	public TypeSerializer<IN> getInputSerializer() {
+		return inputSerializer;
+	}
+
+	protected abstract NFA<IN> getNFA() throws IOException;
+
+	protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException;
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		if (isProcessingTime) {
+			// there can be no out of order elements in processing time
+			NFA<IN> nfa = getNFA();
+			processEvent(nfa, element.getValue(), element.getTimestamp());
+		} else {
+			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+
+			// event time processing
+			// we have to buffer the elements until we receive the proper watermark
+			if (getExecutionConfig().isObjectReuseEnabled()) {
+				// copy the StreamRecord so that it cannot be changed
+				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+			} else {
+				priorityQueue.offer(element);
+			}
+		}
+	}
+
+	/**
+	 * Process the given event by giving it to the NFA and outputting the produced set of matched
+	 * event sequences.
+	 *
+	 * @param nfa NFA to be used for the event detection
+	 * @param event The current event to be processed
+	 * @param timestamp The timestamp of the event
+	 */
+	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
+		Collection<Map<String, IN>> patterns = nfa.process(
+			event,
+			timestamp);
+
+		if (!patterns.isEmpty()) {
+			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
+				null,
+				timestamp);
+
+			for (Map<String, IN> pattern : patterns) {
+				streamRecord.replace(pattern);
+				output.collect(streamRecord);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
new file mode 100644
index 0000000..2ad152e
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.PriorityQueue;
+
+/**
+ * CEP pattern operator implementation which is used for non keyed streams. Consequently,
+ * the operator state only includes a single {@link NFA} and a priority queue to order out of order
+ * elements in case of event time processing.
+ *
+ * @param <IN> Type of the input elements
+ */
+public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN> {
+	private static final long serialVersionUID = 7487334510746595640L;
+
+	private final StreamRecordSerializer<IN> streamRecordSerializer;
+
+	// global nfa for all elements
+	private NFA<IN> nfa;
+
+	// queue to buffer out of order stream records
+	private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
+
+	public CEPPatternOperator(
+			TypeSerializer<IN> inputSerializer,
+			boolean isProcessingTime,
+			NFACompiler.NFAFactory<IN> nfaFactory) {
+		super(inputSerializer, isProcessingTime);
+
+		this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer);
+		this.nfa = nfaFactory.createNFA();
+	}
+
+	@Override
+	public void open() {
+		if (priorityQueue == null) {
+			priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
+		}
+	}
+
+	@Override
+	protected NFA<IN> getNFA() throws IOException {
+		return nfa;
+	}
+
+	@Override
+	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
+		return priorityQueue;
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+			StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+		}
+	}
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		final StateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(
+			checkpointId,
+			timestamp);
+
+		final ObjectOutputStream oos = new ObjectOutputStream(os);
+		final StateBackend.CheckpointStateOutputView ov = new StateBackend.CheckpointStateOutputView(os);
+
+		oos.writeObject(nfa);
+
+		ov.writeInt(priorityQueue.size());
+
+		for (StreamRecord<IN> streamRecord: priorityQueue) {
+			streamRecordSerializer.serialize(streamRecord, ov);
+		}
+
+		taskState.setOperatorState(os.closeAndGetHandle());
+
+		return taskState;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+
+		StreamStateHandle stream = (StreamStateHandle)state.getOperatorState();
+
+		final InputStream is = stream.getState(getUserCodeClassloader());
+		final ObjectInputStream ois = new ObjectInputStream(is);
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
+
+		nfa = (NFA<IN>)ois.readObject();
+
+		int numberPriorityQueueEntries = div.readInt();
+
+		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
+
+		for (int i = 0; i <numberPriorityQueueEntries; i++) {
+			priorityQueue.offer(streamRecordSerializer.deserialize(div));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
new file mode 100644
index 0000000..03758c7
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * CEP pattern operator implementation for a keyed input stream. For each key, the operator creates
+ * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
+ * stored using the key value state. Additionally, the set of all seen keys is kept as part of the
+ * operator state. This is necessary to trigger the execution for all keys upon receiving a new
+ * watermark.
+ *
+ * @param <IN> Type of the input elements
+ * @param <KEY> Type of the key on which the input stream is keyed
+ */
+public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator<IN> {
+	private static final long serialVersionUID = -7234999752950159178L;
+
+	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
+	private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName";
+
+	// necessary to extract the key from the input elements
+	private final KeySelector<IN, KEY> keySelector;
+
+	// necessary to serialize the set of seen keys
+	private final TypeSerializer<KEY> keySerializer;
+
+	private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
+	private final NFACompiler.NFAFactory<IN> nfaFactory;
+
+	// stores the keys we've already seen to trigger execution upon receiving a watermark
+	// this can be problematic, since it is never cleared
+	// TODO: fix once the state refactoring is completed
+	private transient Set<KEY> keys;
+
+	private transient OperatorState<NFA<IN>> nfaOperatorState;
+	private transient OperatorState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
+
+	public KeyedCEPPatternOperator(
+			TypeSerializer<IN> inputSerializer,
+			boolean isProcessingTime,
+			KeySelector<IN, KEY> keySelector,
+			TypeSerializer<KEY> keySerializer,
+			NFACompiler.NFAFactory<IN> nfaFactory) {
+		super(inputSerializer, isProcessingTime);
+
+		this.keySelector = keySelector;
+		this.keySerializer = keySerializer;
+
+		this.nfaFactory = nfaFactory;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		if (keys == null) {
+			keys = new HashSet<>();
+		}
+
+		if (nfaOperatorState == null) {
+			nfaOperatorState = this.createKeyValueState(
+				NFA_OPERATOR_STATE_NAME,
+				new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()),
+				null);
+		}
+
+		if (priorityQueueOperatorState == null) {
+			priorityQueueOperatorState = this.createKeyValueState(
+				PRIORIRY_QUEUE_STATE_NAME,
+				new PriorityQueueSerializer<StreamRecord<IN>>(
+					new StreamRecordSerializer<IN>(getInputSerializer()),
+					new PriorityQueueStreamRecordFactory<IN>()),
+				null);
+		}
+	}
+
+	@Override
+	protected NFA<IN> getNFA() throws IOException {
+		NFA<IN> nfa = nfaOperatorState.value();
+
+		if (nfa == null) {
+			nfa = nfaFactory.createNFA();
+
+			nfaOperatorState.update(nfa);
+		}
+
+		return nfa;
+	}
+
+	@Override
+	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
+		PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
+
+		if (priorityQueue == null) {
+			priorityQueue = priorityQueueFactory.createPriorityQueue();
+
+			priorityQueueOperatorState.update(priorityQueue);
+		}
+
+		return priorityQueue;
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		keys.add(keySelector.getKey(element.getValue()));
+
+		super.processElement(element);
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		// iterate over all keys to trigger the execution of the buffered elements
+		for (KEY key: keys) {
+			setKeyContext(key);
+
+			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+
+			NFA<IN> nfa = getNFA();
+
+			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+				StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+			}
+		}
+	}
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		StateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+		ov.writeInt(keys.size());
+
+		for (KEY key: keys) {
+			keySerializer.serialize(key, ov);
+		}
+
+		taskState.setOperatorState(ov.closeAndGetHandle());
+
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+
+		@SuppressWarnings("unchecked")
+		StateHandle<DataInputView> stateHandle = (StateHandle<DataInputView>) state;
+
+		DataInputView inputView = stateHandle.getState(getUserCodeClassloader());
+
+		if (keys == null) {
+			keys = new HashSet<>();
+		}
+
+		int numberEntries = inputView.readInt();
+
+		for (int i = 0; i <numberEntries; i++) {
+			keys.add(keySerializer.deserialize(inputView));
+		}
+	}
+
+	/**
+	 * Custom type serializer implementation to serialize priority queues.
+	 *
+	 * @param <T> Type of the priority queue's elements
+	 */
+	private static class PriorityQueueSerializer<T> extends TypeSerializer<PriorityQueue<T>> {
+
+		private static final long serialVersionUID = -231980397616187715L;
+
+		private final TypeSerializer<T> elementSerializer;
+		private final PriorityQueueFactory<T> factory;
+
+		public PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
+			this.elementSerializer = elementSerializer;
+			this.factory = factory;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<PriorityQueue<T>> duplicate() {
+			return new PriorityQueueSerializer<>(elementSerializer.duplicate(), factory);
+		}
+
+		@Override
+		public PriorityQueue<T> createInstance() {
+			return factory.createPriorityQueue();
+		}
+
+		@Override
+		public PriorityQueue<T> copy(PriorityQueue<T> from) {
+			PriorityQueue<T> result = factory.createPriorityQueue();
+
+			for (T element: from) {
+				result.offer(elementSerializer.copy(element));
+			}
+
+			return result;
+		}
+
+		@Override
+		public PriorityQueue<T> copy(PriorityQueue<T> from, PriorityQueue<T> reuse) {
+			reuse.clear();
+
+			for (T element: from) {
+				reuse.offer(elementSerializer.copy(element));
+			}
+
+			return reuse;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(PriorityQueue<T> record, DataOutputView target) throws IOException {
+			target.writeInt(record.size());
+
+			for (T element: record) {
+				elementSerializer.serialize(element, target);
+			}
+		}
+
+		@Override
+		public PriorityQueue<T> deserialize(DataInputView source) throws IOException {
+			PriorityQueue<T> result = factory.createPriorityQueue();
+
+			return deserialize(result, source);
+		}
+
+		@Override
+		public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, DataInputView source) throws IOException {
+			reuse.clear();
+
+			int numberEntries = source.readInt();
+
+			for (int i = 0; i < numberEntries; i++) {
+				reuse.offer(elementSerializer.deserialize(source));
+			}
+
+			return reuse;
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof PriorityQueueSerializer) {
+				@SuppressWarnings("unchecked")
+				PriorityQueueSerializer<T> other = (PriorityQueueSerializer<T>) obj;
+
+				return factory.equals(other.factory) && elementSerializer.equals(other.elementSerializer);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof PriorityQueueSerializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(factory, elementSerializer);
+		}
+	}
+
+	private interface PriorityQueueFactory<T> extends Serializable {
+		PriorityQueue<T> createPriorityQueue();
+	}
+
+	private static class PriorityQueueStreamRecordFactory<T> implements PriorityQueueFactory<StreamRecord<T>> {
+
+		private static final long serialVersionUID = 1254766984454616593L;
+
+		@Override
+		public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
+			return new PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<T>());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
new file mode 100644
index 0000000..b290e7b
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Compares two {@link StreamRecord}s based on their timestamp
+ *
+ * @param <IN> Type of the value field of the StreamRecord
+ */
+public class StreamRecordComparator<IN> implements Comparator<StreamRecord<IN>>, Serializable {
+	private static final long serialVersionUID = 1581054988433915305L;
+
+	@Override
+	public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+		if (o1.getTimestamp() < o2.getTimestamp()) {
+			return -1;
+		} else if (o1.getTimestamp() > o2.getTimestamp()) {
+			return 1;
+		} else {
+			return 0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
new file mode 100644
index 0000000..d01643d
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which combines two filter functions with a logical and. Thus, the filter
+ * function only returns true, iff both filters return true.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class AndFilterFunction<T> implements FilterFunction<T> {
+	private static final long serialVersionUID = -2109562093871155005L;
+
+	private final FilterFunction<T> left;
+	private final FilterFunction<T> right;
+
+	public AndFilterFunction(final FilterFunction<T> left, final FilterFunction<T> right) {
+		this.left = left;
+		this.right = right;
+	}
+
+	@Override
+	public boolean filter(T value) throws Exception {
+		return left.filter(value) && right.filter(value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
new file mode 100644
index 0000000..266451c
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+/**
+ * Pattern operator which signifies that the there is a non-strict temporal contiguity between
+ * itself and its preceding pattern operator. This means that there might be events in between
+ * two matching events. These events are then simply ignored.
+ *
+ * @param <T> Base type of the events
+ * @param <F> Subtype of T to which the operator is currently constrained
+ */
+public class FollowedByPattern<T, F extends T> extends Pattern<T, F> {
+	FollowedByPattern(final String name, Pattern<T, ?> previous) {
+		super(name, previous);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
new file mode 100644
index 0000000..76f660a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+/**
+ * Base class for a pattern definition.
+ * <p>
+ * A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+ * a {@link NFA}.
+ *
+ * <pre>{@code
+ * Pattern<T> pattern = Pattern.<T>begin("start")
+ *   .next("middle").subtype(F.class)
+ *   .followedBy("end").where(new MyFilterFunction());
+ * }
+ * </pre>
+ *
+ * @param <T> Base type of the elements appearing in the pattern
+ * @param <F> Subtype of T to which the current pattern operator is constrained
+ */
+public class Pattern<T, F extends T> {
+
+	// name of the pattern operator
+	private final String name;
+
+	// previous pattern operator
+	private final Pattern<T, ?> previous;
+
+	// filter condition for an event to be matched
+	private FilterFunction<F> filterFunction;
+
+	// window length in which the pattern match has to occur
+	private Time windowTime;
+
+	protected Pattern(final String name, final Pattern<T, ?> previous) {
+		this.name = name;
+		this.previous = previous;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public Pattern<T, ?> getPrevious() {
+		return previous;
+	}
+
+	public FilterFunction<F> getFilterFunction() {
+		return filterFunction;
+	}
+
+	public Time getWindowTime() {
+		return windowTime;
+	}
+
+	/**
+	 * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
+	 *
+	 * @param newFilterFunction Filter condition
+	 * @return The same pattern operator where the new filter condition is set
+	 */
+	public Pattern<T, F> where(FilterFunction<F> newFilterFunction) {
+		ClosureCleaner.clean(newFilterFunction, true);
+
+		if (this.filterFunction == null) {
+			this.filterFunction = newFilterFunction;
+		} else {
+			this.filterFunction = new AndFilterFunction<F>(this.filterFunction, newFilterFunction);
+		}
+
+		return this;
+	}
+
+	/**
+	 * Applies a subtype constraint on the current pattern operator. This means that an event has
+	 * to be of the given subtype in order to be matched.
+	 *
+	 * @param subtypeClass Class of the subtype
+	 * @param <S> Type of the subtype
+	 * @return The same pattern operator with the new subtype constraint
+	 */
+	public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) {
+		if (filterFunction == null) {
+			this.filterFunction = new SubtypeFilterFunction<F>(subtypeClass);
+		} else {
+			this.filterFunction = new AndFilterFunction<F>(this.filterFunction, new SubtypeFilterFunction<F>(subtypeClass));
+		}
+
+		@SuppressWarnings("unchecked")
+		Pattern<T, S> result = (Pattern<T, S>) this;
+
+		return result;
+	}
+
+	/**
+	 * Defines the maximum time interval for a matching pattern. This means that the time gap
+	 * between first and the last event must not be longer than the window time.
+	 *
+	 * @param windowTime Time of the matching window
+	 * @return The same pattenr operator with the new window length
+	 */
+	public Pattern<T, F> within(Time windowTime) {
+		if (windowTime != null) {
+			this.windowTime = windowTime;
+		}
+
+		return this;
+	}
+
+	/**
+	 * Appends a new pattern operator to the existing one. The new pattern operator enforces strict
+	 * temporal contiguity. This means that the whole pattern only matches if an event which matches
+	 * this operator directly follows the preceding matching event. Thus, there cannot be any
+	 * events in between two matching events.
+	 *
+	 * @param name Name of the new pattern operator
+	 * @return A new pattern operator which is appended to this pattern operator
+	 */
+	public Pattern<T, T> next(final String name) {
+		return new Pattern<T, T>(name, this);
+	}
+
+	/**
+	 * Appends a new pattern operator to the existing one. The new pattern operator enforces
+	 * non-strict temporal contiguity. This means that a matching event of this operator and the
+	 * preceding matching event might be interleaved with other events which are ignored.
+	 *
+	 * @param name Name of the new pattern operator
+	 * @return A new pattern operator which is appended to this pattern operator
+	 */
+	public FollowedByPattern<T, T> followedBy(final String name) {
+		return new FollowedByPattern<T, T>(name, this);
+	}
+
+	/**
+	 * Starts a new pattern with the initial pattern operator whose name is provided. Furthermore,
+	 * the base type of the event sequence is set.
+	 *
+	 * @param name Name of the new pattern operator
+	 * @param <X> Base type of the event pattern
+	 * @return The first pattern operator of a pattern
+	 */
+	public static <X> Pattern<X, X> begin(final String name) {
+		return new Pattern<X, X>(name, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
new file mode 100644
index 0000000..f183f0f
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which filters elements of the given type. A element if filtered out iff it
+ * is not assignable to the given subtype of T.
+ *
+ * @param <T> Type of the elements to be filtered
+ */
+public class SubtypeFilterFunction<T> implements FilterFunction<T> {
+	private static final long serialVersionUID = -2990017519957561355L;
+
+	// subtype to filter for
+	private final Class<? extends T> subtype;
+
+	public SubtypeFilterFunction(final Class<? extends T> subtype) {
+		this.subtype = subtype;
+	}
+
+	@Override
+	public boolean filter(T value) throws Exception {
+		return subtype.isAssignableFrom(value.getClass());
+	}
+}


Mime
View raw message