flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [hotfix] [cep] Fix serialization problem and single state NFAs
Date Thu, 25 Feb 2016 16:33:34 GMT
Repository: flink
Updated Branches:
  refs/heads/master 56e660d35 -> a5ecb1886


[hotfix] [cep] Fix serialization problem and single state NFAs

The start computation states have a null event associated. When serializing these states
one has to check whether the event is null or not, because not all serializer can handle
null values.

A single state NFA failed to compute a matching pattern because it was regarded as a
terminal state in the pattern extraction algorithm. By letting the first states start
with a two level dewey number this problem is fixed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5ecb188
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5ecb188
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5ecb188

Branch: refs/heads/master
Commit: a5ecb1886e79c9f7215a40b7da71804c07f5da73
Parents: 56e660d
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Feb 25 17:26:46 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 25 17:33:02 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 38 ++++++++++++++------
 .../java/org/apache/flink/cep/CEPITCase.java    | 32 +++++++++++++++++
 .../src/test/resources/log4j-test.properties    | 27 ++++++++++++++
 .../src/test/resources/logback-test.xml         | 34 ++++++++++++++++++
 4 files changed, 120 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5ecb188/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index a68d6eb..1b3024c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -227,25 +227,28 @@ public class NFA<T> implements Serializable {
 								break;
 							case TAKE:
 								final State<T> newState = stateTransition.getTargetState();
-								final DeweyNumber newVersion;
+								final DeweyNumber oldVersion;
+								final DeweyNumber newComputationStateVersion;
 								final State<T> previousState = computationState.getState();
 								final T previousEvent = computationState.getEvent();
 								final long previousTimestamp;
 								final long startTimestamp;
 
 								if (computationState.isStartState()) {
-									newVersion = new DeweyNumber(startEventCounter++);
+									oldVersion = new DeweyNumber(startEventCounter++);
+									newComputationStateVersion = oldVersion.addStage();
 									startTimestamp = timestamp;
 									previousTimestamp = -1L;
 
 								} else {
 									startTimestamp = computationState.getStartTimestamp();
 									previousTimestamp = computationState.getTimestamp();
+									oldVersion = computationState.getVersion();
 
 									if (newState.equals(computationState.getState())) {
-										newVersion = computationState.getVersion().increase();
+										newComputationStateVersion = oldVersion.increase();
 									} else {
-										newVersion = computationState.getVersion().addStage();
+										newComputationStateVersion = oldVersion.addStage();
 									}
 								}
 
@@ -256,7 +259,7 @@ public class NFA<T> implements Serializable {
 									previousState,
 									previousEvent,
 									previousTimestamp,
-									newVersion);
+									oldVersion);
 
 								// a new computation state is referring to the shared entry
 								sharedBuffer.lock(newState, event, timestamp);
@@ -265,7 +268,7 @@ public class NFA<T> implements Serializable {
 									newState,
 									event,
 									timestamp,
-									newVersion,
+									newComputationStateVersion,
 									startTimestamp));
 								break;
 						}
@@ -366,9 +369,15 @@ public class NFA<T> implements Serializable {
 		oos.writeObject(computationState.getVersion());
 		oos.writeLong(computationState.getStartTimestamp());
 
-		DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(oos);
-
-		nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output);
+		if (computationState.getEvent() == null) {
+			// write that we don't have an event associated
+			oos.writeBoolean(false);
+		} else {
+			// write that we have an event associated
+			oos.writeBoolean(true);
+			DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(oos);
+			nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output);
+		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -378,8 +387,15 @@ public class NFA<T> implements Serializable {
 		final DeweyNumber version = (DeweyNumber)ois.readObject();
 		final long startTimestamp = ois.readLong();
 
-		DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois);
-		final T event = nonDuplicatingTypeSerializer.deserialize(input);
+		final boolean hasEvent = ois.readBoolean();
+		final T event;
+
+		if (hasEvent) {
+			DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois);
+			event = nonDuplicatingTypeSerializer.deserialize(input);
+		} else {
+			event = null;
+		}
 
 		return new ComputationState<>(state, event, timestamp, version, startTimestamp);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ecb188/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 37fa9d9..4129e34 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -368,4 +368,36 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		env.execute();
 	}
+
+	@Test
+	public void testSimplePatternWithSingleState() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<Tuple2<Integer, Integer>> input = env.fromElements(
+			new Tuple2<>(0, 1),
+			new Tuple2<>(0, 2));
+
+		Pattern<Tuple2<Integer, Integer>, ?> pattern =
+			Pattern.<Tuple2<Integer, Integer>>begin("start")
+				.where(new FilterFunction<Tuple2<Integer, Integer>>() {
+					@Override
+					public boolean filter(Tuple2<Integer, Integer> rec) throws Exception {
+						return rec.f1 == 1;
+					}
+				});
+
+		PatternStream<Tuple2<Integer, Integer>> pStream = CEP.pattern(input, pattern);
+
+		DataStream<Tuple2<Integer, Integer>> result = pStream.select(new PatternSelectFunction<Tuple2<Integer,
Integer>, Tuple2<Integer, Integer>>() {
+			@Override
+			public Tuple2<Integer, Integer> select(Map<String, Tuple2<Integer, Integer>>
pattern) throws Exception {
+				return pattern.get("start");
+			}
+		});
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		expected = "(0,1)";
+
+		env.execute();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ecb188/flink-libraries/flink-cep/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/log4j-test.properties b/flink-libraries/flink-cep/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/a5ecb188/flink-libraries/flink-cep/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/logback-test.xml b/flink-libraries/flink-cep/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread}
- %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file


Mime
View raw message