flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
Date Tue, 02 Feb 2016 14:04:57 GMT
Repository: flink
Updated Branches:
  refs/heads/master 682d8d5e2 -> 79058edb6


http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
new file mode 100644
index 0000000..7dcda4c
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Map;
+
+public class CEPITCase extends StreamingMultipleProgramsTestBase {
+
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		expected = "";
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	/**
+	 * Checks that a certain event sequence is recognized
+	 * @throws Exception
+	 */
+	@Test
+	public void testSimplePatternCEP() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Event> input = env.fromElements(
+			new Event(1, "barfoo", 1.0),
+			new Event(2, "start", 2.0),
+			new Event(3, "foobar", 3.0),
+			new SubEvent(4, "foo", 4.0, 1.0),
+			new Event(5, "middle", 5.0),
+			new SubEvent(6, "middle", 6.0, 2.0),
+			new SubEvent(7, "bar", 3.0, 3.0),
+			new Event(42, "42", 42.0),
+			new Event(8, "end", 1.0)
+		);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5681493970790509488L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		})
+		.followedBy("middle").subtype(SubEvent.class).where(
+				new FilterFunction<SubEvent>() {
+					private static final long serialVersionUID = 448591738315698540L;
+
+					@Override
+					public boolean filter(SubEvent value) throws Exception {
+						return value.getName().equals("middle");
+					}
+				}
+			)
+		.followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 6080276591060431966L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
+			private static final long serialVersionUID = 1447462674590806097L;
+
+			@Override
+			public String select(Map<String, Event> pattern) {
+				StringBuilder builder = new StringBuilder();
+
+				builder.append(pattern.get("start").getId()).append(",")
+					.append(pattern.get("middle").getId()).append(",")
+					.append(pattern.get("end").getId());
+
+				return builder.toString();
+			}
+		});
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		// expected sequence of matching event ids
+		expected = "2,6,8";
+
+		env.execute();
+	}
+
+	@Test
+	public void testSimpleKeyedPatternCEP() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(2);
+
+		DataStream<Event> input = env.fromElements(
+			new Event(1, "barfoo", 1.0),
+			new Event(2, "start", 2.0),
+			new Event(3, "start", 2.1),
+			new Event(3, "foobar", 3.0),
+			new SubEvent(4, "foo", 4.0, 1.0),
+			new SubEvent(3, "middle", 3.2, 1.0),
+			new Event(42, "start", 3.1),
+			new SubEvent(42, "middle", 3.3, 1.2),
+			new Event(5, "middle", 5.0),
+			new SubEvent(2, "middle", 6.0, 2.0),
+			new SubEvent(7, "bar", 3.0, 3.0),
+			new Event(42, "42", 42.0),
+			new Event(3, "end", 2.0),
+			new Event(2, "end", 1.0),
+			new Event(42, "end", 42.0)
+		).keyBy(new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -2112041392652797483L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		});
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5681493970790509488L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		})
+			.followedBy("middle").subtype(SubEvent.class).where(
+				new FilterFunction<SubEvent>() {
+					private static final long serialVersionUID = 448591738315698540L;
+
+					@Override
+					public boolean filter(SubEvent value) throws Exception {
+						return value.getName().equals("middle");
+					}
+				}
+			)
+			.followedBy("end").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 6080276591060431966L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			});
+
+		DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
+			private static final long serialVersionUID = 1447462674590806097L;
+
+			@Override
+			public String select(Map<String, Event> pattern) {
+				StringBuilder builder = new StringBuilder();
+
+				builder.append(pattern.get("start").getId()).append(",")
+					.append(pattern.get("middle").getId()).append(",")
+					.append(pattern.get("end").getId());
+
+				return builder.toString();
+			}
+		});
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		// the expected sequences of matching event ids
+		expected = "2,2,2\n3,3,3\n42,42,42";
+
+		env.execute();
+	}
+
+	@Test
+	public void testSimplePatternEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		// (Event, timestamp)
+		DataStream<Event> input = env.fromElements(
+			Tuple2.of(new Event(1, "start", 1.0), 5L),
+			Tuple2.of(new Event(2, "middle", 2.0), 1L),
+			Tuple2.of(new Event(3, "end", 3.0), 3L),
+			Tuple2.of(new Event(4, "end", 4.0), 10L),
+			Tuple2.of(new Event(5, "middle", 5.0), 7L)
+		).assignTimestamps(new TimestampExtractor<Tuple2<Event, Long>>() {
+			private static final long serialVersionUID = 878281782188702293L;
+
+			private Long currentMaxTimestamp = Long.MIN_VALUE;
+
+			@Override
+			public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
+				if (currentMaxTimestamp < element.f1) {
+					currentMaxTimestamp = element.f1;
+				}
+
+				return element.f1;
+			}
+
+			@Override
+			public long extractWatermark(Tuple2<Event, Long> element, long currentTimestamp) {
+				return currentMaxTimestamp - 5;
+			}
+
+			@Override
+			public long getCurrentWatermark() {
+				return Long.MIN_VALUE;
+			}
+		}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+			private static final long serialVersionUID = -5288731103938665328L;
+
+			@Override
+			public Event map(Tuple2<Event, Long> value) throws Exception {
+				return value.f0;
+			}
+		});
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 2601494641888389648L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -3133506934766766660L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -8528031731858936269L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		DataStream<String> result = CEP.pattern(input, pattern).select(
+			new PatternSelectFunction<Event, String>() {
+				private static final long serialVersionUID = 1447462674590806097L;
+
+				@Override
+				public String select(Map<String, Event> pattern) {
+					StringBuilder builder = new StringBuilder();
+
+					builder.append(pattern.get("start").getId()).append(",")
+						.append(pattern.get("middle").getId()).append(",")
+						.append(pattern.get("end").getId());
+
+					return builder.toString();
+				}
+			}
+		);
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		// the expected sequence of matching event ids
+		expected = "1,5,4";
+
+		env.execute();
+	}
+
+	@Test
+	public void testSimpleKeyedPatternEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(2);
+
+		// (Event, timestamp)
+		DataStream<Event> input = env.fromElements(
+			Tuple2.of(new Event(1, "start", 1.0), 5L),
+			Tuple2.of(new Event(1, "middle", 2.0), 1L),
+			Tuple2.of(new Event(2, "middle", 2.0), 4L),
+			Tuple2.of(new Event(2, "start", 2.0), 3L),
+			Tuple2.of(new Event(1, "end", 3.0), 3L),
+			Tuple2.of(new Event(3, "start", 4.1), 5L),
+			Tuple2.of(new Event(1, "end", 4.0), 10L),
+			Tuple2.of(new Event(2, "end", 2.0), 8L),
+			Tuple2.of(new Event(1, "middle", 5.0), 7L),
+			Tuple2.of(new Event(3, "middle", 6.0), 9L),
+			Tuple2.of(new Event(3, "end", 7.0), 7L)
+		).assignTimestamps(new TimestampExtractor<Tuple2<Event, Long>>() {
+			private static final long serialVersionUID = 878281782188702293L;
+
+			private Long currentMaxTimestamp = Long.MIN_VALUE;
+
+			@Override
+			public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
+				if (currentMaxTimestamp < element.f1) {
+					currentMaxTimestamp = element.f1;
+				}
+
+				return element.f1;
+			}
+
+			@Override
+			public long extractWatermark(Tuple2<Event, Long> element, long currentTimestamp) {
+				return currentMaxTimestamp - 5;
+			}
+
+			@Override
+			public long getCurrentWatermark() {
+				return Long.MIN_VALUE;
+			}
+		}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+			private static final long serialVersionUID = -5288731103938665328L;
+
+			@Override
+			public Event map(Tuple2<Event, Long> value) throws Exception {
+				return value.f0;
+			}
+		}).keyBy(new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -3282946957177720879L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		});
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 2601494641888389648L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -3133506934766766660L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -8528031731858936269L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		DataStream<String> result = CEP.pattern(input, pattern).select(
+			new PatternSelectFunction<Event, String>() {
+				private static final long serialVersionUID = 1447462674590806097L;
+
+				@Override
+				public String select(Map<String, Event> pattern) {
+					StringBuilder builder = new StringBuilder();
+
+					builder.append(pattern.get("start").getId()).append(",")
+						.append(pattern.get("middle").getId()).append(",")
+						.append(pattern.get("end").getId());
+
+					return builder.toString();
+				}
+			}
+		);
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		// the expected sequences of matching event ids
+		expected = "1,1,1\n2,2,2";
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
new file mode 100644
index 0000000..efe56b7
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.Objects;
+
+public class Event {
+	private String name;
+	private double price;
+	private int id;
+
+	public Event(int id, String name, double price) {
+		this.id = id;
+		this.name = name;
+		this.price = price;
+	}
+
+	public double getPrice() {
+		return price;
+	}
+
+	public int getId() {
+		return id;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	@Override
+	public String toString() {
+		return "Event(" + id + ", " + name + ", " + price + ")";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof Event) {
+			Event other = (Event) obj;
+
+			return name.equals(other.name) && price == other.price && id == other.id;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(name, price, id);
+	}
+
+	public static TypeSerializer<Event> createTypeSerializer() {
+		TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+
+		return typeInformation.createSerializer(new ExecutionConfig());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java
new file mode 100644
index 0000000..adb9446
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+public class StreamEvent<T> {
+	private final T event;
+	private final long timestamp;
+
+	public StreamEvent(T event, long timestamp) {
+		this.event = event;
+		this.timestamp = timestamp;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public T getEvent() {
+		return event;
+	}
+
+	public static <V> StreamEvent<V> of(V event, long timestamp) {
+		return new StreamEvent<>(event, timestamp);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
new file mode 100644
index 0000000..31eff28
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+public class SubEvent extends Event {
+	private final double volume;
+
+	public SubEvent(int id, String name, double price, double volume) {
+		super(id, name, price);
+		this.volume = volume;
+	}
+
+	public double getVolume() {
+		return volume;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+
+		builder.append("SubEvent(")
+			.append(getId())
+			.append(", ")
+			.append(getName())
+			.append(", ")
+			.append(getPrice())
+			.append(", ")
+			.append(getVolume())
+			.append(")");
+
+		return builder.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
new file mode 100644
index 0000000..8bc010a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DeweyNumberTest extends TestLogger {
+
+	@Test
+	public void testDeweyNumberGeneration() {
+		DeweyNumber start = new DeweyNumber(1);
+		DeweyNumber increased = start.increase();
+		DeweyNumber increaseAddStage = increased.addStage();
+		DeweyNumber startAddStage = start.addStage();
+		DeweyNumber startAddStageIncreased = startAddStage.increase();
+		DeweyNumber startAddStageIncreasedAddStage = startAddStageIncreased.addStage();
+
+		assertEquals(DeweyNumber.fromString("1"), start);
+		assertEquals(DeweyNumber.fromString("2"), increased);
+		assertEquals(DeweyNumber.fromString("2.0"), increaseAddStage);
+		assertEquals(DeweyNumber.fromString("1.0"), startAddStage);
+		assertEquals(DeweyNumber.fromString("1.1"), startAddStageIncreased);
+		assertEquals(DeweyNumber.fromString("1.1.0"), startAddStageIncreasedAddStage);
+
+		assertTrue(startAddStage.isCompatibleWith(start));
+		assertTrue(startAddStageIncreased.isCompatibleWith(startAddStage));
+		assertTrue(startAddStageIncreasedAddStage.isCompatibleWith(startAddStageIncreased));
+		assertFalse(startAddStageIncreasedAddStage.isCompatibleWith(startAddStage));
+		assertFalse(increaseAddStage.isCompatibleWith(startAddStage));
+		assertFalse(startAddStage.isCompatibleWith(increaseAddStage));
+		assertFalse(startAddStageIncreased.isCompatibleWith(startAddStageIncreasedAddStage));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
new file mode 100644
index 0000000..a46a81e
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.StreamEvent;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class NFAITCase extends TestLogger {
+
+	@Test
+	public void testSimplePatternNFA() {
+		List<StreamEvent<Event>> inputEvents = new ArrayList<StreamEvent<Event>>();
+
+		Event startEvent = new Event(42, "start", 1.0);
+		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+		Event endEvent=  new Event(43, "end", 1.0);
+
+		inputEvents.add(new StreamEvent<Event>(startEvent, 1));
+		inputEvents.add(new StreamEvent<Event>(new Event(43, "foobar", 1.0), 2));
+		inputEvents.add(new StreamEvent<Event>(new SubEvent(41, "barfoo", 1.0, 5.0), 3));
+		inputEvents.add(new StreamEvent<Event>(middleEvent, 3));
+		inputEvents.add(new StreamEvent<Event>(new Event(43, "start", 1.0), 4));
+		inputEvents.add(new StreamEvent<Event>(endEvent, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		})
+		.followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+				private static final long serialVersionUID = 6215754202506583964L;
+
+				@Override
+				public boolean filter(SubEvent value) throws Exception {
+					return value.getVolume() > 5.0;
+				}
+			})
+		.followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer());
+		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamEvent<Event> inputEvent: inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getEvent(),
+				inputEvent.getTimestamp());
+
+			resultingPatterns.addAll(patterns);
+		}
+
+		assertEquals(1, resultingPatterns.size());
+		Map<String, Event> patternMap = resultingPatterns.get(0);
+
+		assertEquals(startEvent, patternMap.get("start"));
+		assertEquals(middleEvent, patternMap.get("middle"));
+		assertEquals(endEvent, patternMap.get("end"));
+	}
+
+	/**
+	 * Tests that the NFA successfully filters out expired elements with respect to the window
+	 * length
+	 */
+	@Test
+	public void testSimplePatternWithTimeWindowNFA() {
+		List<StreamEvent<Event>> events = new ArrayList<>();
+		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
+
+		final Event startEvent;
+		final Event middleEvent;
+		final Event endEvent;
+
+		events.add(new StreamEvent<Event>(new Event(1, "start", 1.0), 1));
+		events.add(new StreamEvent<Event>(startEvent = new Event(2, "start", 1.0), 2));
+		events.add(new StreamEvent<Event>(middleEvent = new Event(3, "middle", 1.0), 3));
+		events.add(new StreamEvent<Event>(new Event(4, "foobar", 1.0), 4));
+		events.add(new StreamEvent<Event>(endEvent = new Event(5, "end", 1.0), 12));
+		events.add(new StreamEvent<Event>(new Event(6, "end", 1.0), 13));
+
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 7907391379273505897L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -3268741540234334074L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -8995174172182138608L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		}).within(Time.milliseconds(10));
+
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer());
+
+		for (StreamEvent<Event> event: events) {
+			Collection<Map<String, Event>> patterns = nfa.process(event.getEvent(), event.getTimestamp());
+
+			resultingPatterns.addAll(patterns);
+		}
+
+		assertEquals(1, resultingPatterns.size());
+
+		Map<String, Event> patternMap = resultingPatterns.get(0);
+
+		assertEquals(startEvent, patternMap.get("start"));
+		assertEquals(middleEvent, patternMap.get("middle"));
+		assertEquals(endEvent, patternMap.get("end"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
new file mode 100644
index 0000000..3face76
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.StreamEvent;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class NFATest extends TestLogger {
+	@Test
+	public void testSimpleNFA() {
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0);
+		List<StreamEvent<Event>> streamEvents = new ArrayList<>();
+
+		streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L));
+		streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L));
+		streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 3L));
+		streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L));
+
+		State<Event> startingState = new State<>("", State.StateType.Start);
+		State<Event> startState = new State<>("start", State.StateType.Normal);
+		State<Event> endState = new State<>("end", State.StateType.Final);
+		StateTransition<Event> starting2Start = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			startState,
+			new FilterFunction<Event>() {
+				private static final long serialVersionUID = -4869589195918650396L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("start");
+				}
+			}
+		);
+
+		StateTransition<Event> start2End = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			endState,
+			new FilterFunction<Event>() {
+				private static final long serialVersionUID = 2979804163709590673L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			}
+		);
+
+		StateTransition<Event> start2Start = new StateTransition<>(StateTransitionAction.IGNORE, startState, null);
+
+		startingState.addStateTransition(starting2Start);
+		startState.addStateTransition(start2End);
+		startState.addStateTransition(start2Start);
+
+		nfa.addState(startingState);
+		nfa.addState(startState);
+		nfa.addState(endState);
+
+		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+
+		Map<String, Event> firstPattern = new HashMap<>();
+		firstPattern.put("start", new Event(1, "start", 1.0));
+		firstPattern.put("end", new Event(4, "end", 4.0));
+
+		Map<String, Event> secondPattern = new HashMap<>();
+		secondPattern.put("start", new Event(3, "start", 3.0));
+		secondPattern.put("end", new Event(4, "end", 4.0));
+
+		expectedPatterns.add(firstPattern);
+		expectedPatterns.add(secondPattern);
+
+		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
+
+		assertEquals(expectedPatterns, actualPatterns);
+	}
+
+	@Test
+	public void testTimeoutWindowPruning() {
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 2);
+		List<StreamEvent<Event>> streamEvents = new ArrayList<>();
+
+		streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L));
+		streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L));
+		streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 3L));
+		streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L));
+
+		State<Event> startingState = new State<>("", State.StateType.Start);
+		State<Event> startState = new State<>("start", State.StateType.Normal);
+		State<Event> endState = new State<>("end", State.StateType.Final);
+		StateTransition<Event> starting2Start = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			startState,
+			new FilterFunction<Event>() {
+				private static final long serialVersionUID = -4869589195918650396L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("start");
+				}
+		});
+
+		StateTransition<Event> start2End = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			endState,
+			new FilterFunction<Event>() {
+				private static final long serialVersionUID = 2979804163709590673L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+		});
+
+		StateTransition<Event> start2Start = new StateTransition<>(
+			StateTransitionAction.IGNORE,
+			startState,
+			null);
+
+		startingState.addStateTransition(starting2Start);
+		startState.addStateTransition(start2End);
+		startState.addStateTransition(start2Start);
+
+		nfa.addState(startingState);
+		nfa.addState(startState);
+		nfa.addState(endState);
+
+		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+
+		Map<String, Event> secondPattern = new HashMap<>();
+		secondPattern.put("start", new Event(3, "start", 3.0));
+		secondPattern.put("end", new Event(4, "end", 4.0));
+
+		expectedPatterns.add(secondPattern);
+
+		Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents);
+
+		assertEquals(expectedPatterns, actualPatterns);
+	}
+
+	@Test
+	public void testStateNameGeneration() {
+		String expectedName1 = "a[2]";
+		String expectedName2 = "a_3";
+		String expectedName3 = "a[][42]";
+
+		String generatedName1 = NFA.generateStateName("a[]", 2);
+		String generatedName2 = NFA.generateStateName("a", 3);
+		String generatedName3 = NFA.generateStateName("a[][]", 42);
+
+
+		assertEquals(expectedName1, generatedName1);
+		assertEquals(expectedName2, generatedName2);
+		assertEquals(expectedName3, generatedName3);
+	}
+
+	public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamEvent<T>> inputs) {
+		Set<Map<String, T>> actualPatterns = new HashSet<>();
+
+		for (StreamEvent<T> streamEvent: inputs) {
+			Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getEvent(), streamEvent.getTimestamp());
+
+			actualPatterns.addAll(matchedPatterns);
+		}
+
+		return actualPatterns;
+	}
+
+	@Test
+	public void testNFASerialization() throws IOException, ClassNotFoundException {
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0);
+
+		State<Event> startingState = new State<>("", State.StateType.Start);
+		State<Event> startState = new State<>("start", State.StateType.Normal);
+		State<Event> endState = new State<>("end", State.StateType.Final);
+
+		StateTransition<Event> starting2Start = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			startState,
+			new NameFilter("start"));
+
+		StateTransition<Event> start2End = new StateTransition<>(
+			StateTransitionAction.TAKE,
+			endState,
+			new NameFilter("end"));
+
+		StateTransition<Event> start2Start = new StateTransition<>(
+			StateTransitionAction.IGNORE,
+			startState,
+			null);
+
+		startingState.addStateTransition(starting2Start);
+		startState.addStateTransition(start2End);
+		startState.addStateTransition(start2Start);
+
+		nfa.addState(startingState);
+		nfa.addState(startState);
+		nfa.addState(endState);
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+		oos.writeObject(nfa);
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+		ObjectInputStream ois = new ObjectInputStream(bais);
+
+		@SuppressWarnings("unchecked")
+		NFA<Event> copy = (NFA<Event>) ois.readObject();
+
+		assertEquals(nfa, copy);
+	}
+
+	private static class NameFilter implements FilterFunction<Event> {
+
+		private static final long serialVersionUID = 7472112494752423802L;
+
+		private final String name;
+
+		public NameFilter(final String name) {
+			this.name = name;
+		}
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals(name);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
new file mode 100644
index 0000000..25618d5
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.cep.Event;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SharedBufferTest extends TestLogger {
+
+	@Test
+	public void testSharedBuffer() {
+		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
+		int numberEvents = 8;
+		Event[] events = new Event[numberEvents];
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+		}
+
+		LinkedHashMultimap<String, Event> expectedPattern1 = LinkedHashMultimap.create();
+		expectedPattern1.put("a1", events[2]);
+		expectedPattern1.put("a[]", events[3]);
+		expectedPattern1.put("b", events[5]);
+
+		LinkedHashMultimap<String, Event> expectedPattern2 = LinkedHashMultimap.create();
+		expectedPattern2.put("a1", events[0]);
+		expectedPattern2.put("a[]", events[1]);
+		expectedPattern2.put("a[]", events[2]);
+		expectedPattern2.put("a[]", events[3]);
+		expectedPattern2.put("a[]", events[4]);
+		expectedPattern2.put("b", events[5]);
+
+		LinkedHashMultimap<String, Event> expectedPattern3 = LinkedHashMultimap.create();
+		expectedPattern3.put("a1", events[0]);
+		expectedPattern3.put("a[]", events[1]);
+		expectedPattern3.put("a[]", events[2]);
+		expectedPattern3.put("a[]", events[3]);
+		expectedPattern3.put("a[]", events[4]);
+		expectedPattern3.put("a[]", events[5]);
+		expectedPattern3.put("a[]", events[6]);
+		expectedPattern3.put("b", events[7]);
+
+		sharedBuffer.put("a1", events[0], timestamp, null, null, 0, DeweyNumber.fromString("1"));
+		sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a1", events[2], timestamp, null, null, 0, DeweyNumber.fromString("2"));
+		sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, DeweyNumber.fromString("2.0"));
+		sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.1"));
+		sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("2.0.0"));
+		sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.0.0"));
+		sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, DeweyNumber.fromString("1.1"));
+		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0"));
+
+		Collection<LinkedHashMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
+		sharedBuffer.remove("b", events[7], timestamp);
+		Collection<LinkedHashMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0"));
+		Collection<LinkedHashMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0"));
+		Collection<LinkedHashMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0"));
+		sharedBuffer.remove("b", events[5], timestamp);
+
+		assertTrue(sharedBuffer.isEmpty());
+		assertTrue(patterns4.isEmpty());
+		assertEquals(Collections.singletonList(expectedPattern1), patterns1);
+		assertEquals(Collections.singletonList(expectedPattern2), patterns2);
+		assertEquals(Collections.singletonList(expectedPattern3), patterns3);
+	}
+
+	@Test
+	public void testSharedBufferSerialization() throws IOException, ClassNotFoundException {
+		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
+		int numberEvents = 8;
+		Event[] events = new Event[numberEvents];
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+		}
+
+		sharedBuffer.put("a1", events[0], timestamp, null, null, 0, DeweyNumber.fromString("1"));
+		sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a1", events[2], timestamp, null, null, 0, DeweyNumber.fromString("2"));
+		sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, DeweyNumber.fromString("2.0"));
+		sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("1.0"));
+		sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.1"));
+		sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("2.0.0"));
+		sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.0.0"));
+		sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, DeweyNumber.fromString("1.1"));
+		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0"));
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+		oos.writeObject(sharedBuffer);
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+		ObjectInputStream ois = new ObjectInputStream(bais);
+
+		SharedBuffer<String, Event> copy = (SharedBuffer<String, Event>)ois.readObject();
+
+		assertEquals(sharedBuffer, copy);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
new file mode 100644
index 0000000..eb3ead1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class NFACompilerTest extends TestLogger {
+
+	/**
+	 * Tests that the NFACompiler generates the correct NFA from a given Pattern
+	 */
+	@Test
+	public void testNFACompilerWithSimplePattern() {
+		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 3314714776170474221L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getPrice() > 2;
+			}
+		})
+		.followedBy("middle").subtype(SubEvent.class)
+		.next("end").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 3990995859716364087L;
+
+				@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+
+		NFA<Event> nfa = NFACompiler.<Event>compile(pattern, typeInformation.createSerializer(new ExecutionConfig()));
+
+		Set<State<Event>> states = nfa.getStates();
+
+		assertEquals(4, states.size());
+
+		Map<String, State<Event>> stateMap = new HashMap<>();
+
+		for (State<Event> state: states) {
+			stateMap.put(state.getName(), state);
+		}
+
+		assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME));
+		State<Event> beginningState = stateMap.get(NFACompiler.BEGINNING_STATE_NAME);
+
+		assertTrue(beginningState.isStart());
+
+		assertTrue(stateMap.containsKey("start"));
+		State<Event> startState = stateMap.get("start");
+
+		Collection<StateTransition<Event>> startTransitions = startState.getStateTransitions();
+		Map<String, StateTransition<Event>> startTransitionMap = new HashMap<>();
+
+		for (StateTransition<Event> transition: startTransitions) {
+			startTransitionMap.put(transition.getTargetState().getName(), transition);
+		}
+
+		assertEquals(2, startTransitionMap.size());
+		assertTrue(startTransitionMap.containsKey("start"));
+
+		StateTransition<Event> reflexiveTransition = startTransitionMap.get("start");
+		assertEquals(StateTransitionAction.IGNORE, reflexiveTransition.getAction());
+
+		assertTrue(startTransitionMap.containsKey("middle"));
+		StateTransition<Event> startMiddleTransition = startTransitionMap.get("middle");
+		assertEquals(StateTransitionAction.TAKE, startMiddleTransition.getAction());
+
+		assertTrue(stateMap.containsKey("middle"));
+		State<Event> middleState = stateMap.get("middle");
+
+		Map<String, StateTransition<Event>> middleTransitionMap = new HashMap<>();
+
+		for (StateTransition<Event> transition: middleState.getStateTransitions()) {
+			middleTransitionMap.put(transition.getTargetState().getName(), transition);
+		}
+
+		assertEquals(1, middleTransitionMap.size());
+
+		assertTrue(middleTransitionMap.containsKey("end"));
+		StateTransition<Event> middleEndTransition = middleTransitionMap.get("end");
+
+		assertEquals(StateTransitionAction.TAKE, middleEndTransition.getAction());
+
+		assertTrue(stateMap.containsKey("end"));
+		State<Event> endState = stateMap.get("end");
+
+		assertTrue(endState.isFinal());
+		assertEquals(0, endState.getStateTransitions().size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
new file mode 100644
index 0000000..2edf005
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class PatternTest extends TestLogger {
+	/**
+	 * These test simply test that the pattern construction completes without failure
+	 */
+
+	@Test
+	public void testStrictContiguity() {
+		Pattern<Object, ?> pattern = Pattern.begin("start").next("next").next("end");
+		Pattern<Object, ?> previous;
+		Pattern<Object, ?> previous2;
+
+		assertNotNull(previous = pattern.getPrevious());
+		assertNotNull(previous2 = previous.getPrevious());
+		assertNull(previous2.getPrevious());
+
+		assertEquals(pattern.getName(), "end");
+		assertEquals(previous.getName(), "next");
+		assertEquals(previous2.getName(), "start");
+	}
+
+	@Test
+	public void testNonStrictContiguity() {
+		Pattern<Object, ?> pattern = Pattern.begin("start").followedBy("next").followedBy("end");
+		Pattern<Object, ?> previous;
+		Pattern<Object, ?> previous2;
+
+		assertNotNull(previous = pattern.getPrevious());
+		assertNotNull(previous2 = previous.getPrevious());
+		assertNull(previous2.getPrevious());
+
+		assertTrue(pattern instanceof FollowedByPattern);
+		assertTrue(previous instanceof FollowedByPattern);
+
+		assertEquals(pattern.getName(), "end");
+		assertEquals(previous.getName(), "next");
+		assertEquals(previous2.getName(), "start");
+	}
+
+	@Test
+	public void testStrictContiguityWithCondition() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").next("next").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -7657256242101104925L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("foobar");
+			}
+		}).next("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -7597452389191504189L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getId() == 42;
+			}
+		});
+
+		Pattern<Event, ?> previous;
+		Pattern<Event, ?> previous2;
+
+		assertNotNull(previous = pattern.getPrevious());
+		assertNotNull(previous2 = previous.getPrevious());
+		assertNull(previous2.getPrevious());
+
+		assertNotNull(pattern.getFilterFunction());
+		assertNotNull(previous.getFilterFunction());
+		assertNull(previous2.getFilterFunction());
+
+		assertEquals(pattern.getName(), "end");
+		assertEquals(previous.getName(), "next");
+		assertEquals(previous2.getName(), "start");
+	}
+
+	@Test
+	public void testPatternWithSubtyping() {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").next("subevent").subtype(SubEvent.class).followedBy("end");
+
+		Pattern<Event, ?> previous;
+		Pattern<Event, ?> previous2;
+
+		assertNotNull(previous = pattern.getPrevious());
+		assertNotNull(previous2 = previous.getPrevious());
+		assertNull(previous2.getPrevious());
+
+		assertNotNull(previous.getFilterFunction());
+		assertTrue(previous.getFilterFunction() instanceof SubtypeFilterFunction);
+
+		assertEquals(pattern.getName(), "end");
+		assertEquals(previous.getName(), "subevent");
+		assertEquals(previous2.getName(), "start");
+	}
+
+	@Test
+	public void testPatternWithSubtypingAndFilter() {
+		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").next("subevent").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+			private static final long serialVersionUID = -4118591291880230304L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return false;
+			}
+		}).followedBy("end");
+
+		Pattern<Event, ?> previous;
+		Pattern<Event, ?> previous2;
+
+		assertNotNull(previous = pattern.getPrevious());
+		assertNotNull(previous2 = previous.getPrevious());
+		assertNull(previous2.getPrevious());
+
+		assertTrue(pattern instanceof FollowedByPattern);
+		assertNotNull(previous.getFilterFunction());
+
+		assertEquals(pattern.getName(), "end");
+		assertEquals(previous.getName(), "subevent");
+		assertEquals(previous2.getName(), "start");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index e0fbd49..34153c9 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -39,5 +39,6 @@ under the License.
 		<module>flink-python</module>
 		<module>flink-table</module>
 		<module>flink-ml</module>
+		<module>flink-cep</module>
 	</modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 1ef3298..96ddda1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -105,7 +105,7 @@ public interface StreamOperator<OUT> extends Serializable {
 	/**
 	 * Restores the operator state, if this operator's execution is recovering from a checkpoint.
 	 * This method restores the operator state (if the operator is stateful) and the key/value state
-	 * (if it had been used and was initialized when the snapshot ocurred).
+	 * (if it had been used and was initialized when the snapshot occurred).
 	 *
 	 * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
 	 * and before {@link #open()}.


Mime
View raw message