Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AA290200C40 for ; Thu, 23 Mar 2017 13:47:28 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A7F68160B95; Thu, 23 Mar 2017 12:47:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 953E9160B75 for ; Thu, 23 Mar 2017 13:47:26 +0100 (CET) Received: (qmail 74759 invoked by uid 500); 23 Mar 2017 12:47:25 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 74726 invoked by uid 99); 23 Mar 2017 12:47:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Mar 2017 12:47:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B032FDFE34; Thu, 23 Mar 2017 12:47:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Date: Thu, 23 Mar 2017 12:47:24 -0000 Message-Id: <54d1e69d4962458fb288b95594e4d00f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: [FLINK-3318] Backward compatibility of CEP NFA archived-at: Thu, 23 Mar 2017 12:47:28 -0000 Repository: flink Updated Branches: refs/heads/master d0695c054 -> d20fb090c [FLINK-3318] Backward compatibility of CEP NFA Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20fb090 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20fb090 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20fb090 Branch: refs/heads/master Commit: d20fb090c31858bc0372a8c84228d796558d56b0 Parents: 9001c4e Author: Dawid Wysakowicz Authored: Tue Mar 21 10:53:07 2017 +0100 Committer: kl0u Committed: Thu Mar 23 10:47:55 2017 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 162 ++++++- .../org/apache/flink/cep/nfa/SharedBuffer.java | 49 ++ .../java/org/apache/flink/cep/nfa/State.java | 19 +- .../flink/cep/nfa/compiler/NFACompiler.java | 116 +++++ .../org/apache/flink/cep/nfa/NFAITCase.java | 194 +++++++- .../cep/operator/CEPMigration12to13Test.java | 477 +++++++++++++++++++ .../test/resources/cep-branching-snapshot-1.2 | Bin 0 -> 6736 bytes .../resources/cep-single-pattern-snapshot-1.2 | Bin 0 -> 3311 bytes .../test/resources/cep-starting-snapshot-1.2 | Bin 0 -> 6526 bytes 9 files changed, 986 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 3d42248..ab03566 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -18,6 +18,8 @@ package org.apache.flink.cep.nfa; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; import com.google.common.collect.LinkedHashMultimap; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -25,18 +27,22 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; +import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.windowing.time.Time; +import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.OptionalDataException; import java.io.Serializable; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -85,9 +91,9 @@ public class NFA implements Serializable { private final NonDuplicatingTypeSerializer nonDuplicatingTypeSerializer; /** - * Buffer used to store the matched events. + * Used only for backward compatibility. Buffer used to store the matched events. */ - private final SharedBuffer sharedBuffer; + private final SharedBuffer, T> sharedBuffer = null; /** * A set of all the valid NFA states, as returned by the @@ -110,12 +116,22 @@ public class NFA implements Serializable { private final boolean handleTimeout; /** + * Used only for backward compatibility. + */ + private int startEventCounter; + + /** * Current set of {@link ComputationState computation states} within the state machine. * These are the "active" intermediate states that are waiting for new matching * events to transition to new valid states. */ private transient Queue> computationStates; + /** + * Buffer used to store the matched events. + */ + private final SharedBuffer stringSharedBuffer; + public NFA( final TypeSerializer eventSerializer, final long windowTime, @@ -124,7 +140,7 @@ public class NFA implements Serializable { this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer); this.windowTime = windowTime; this.handleTimeout = handleTimeout; - sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); + stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); computationStates = new LinkedList<>(); states = new HashSet<>(); @@ -156,7 +172,7 @@ public class NFA implements Serializable { * {@code false} otherwise. */ public boolean isEmpty() { - return sharedBuffer.isEmpty(); + return stringSharedBuffer.isEmpty(); } /** @@ -194,9 +210,14 @@ public class NFA implements Serializable { } } - // remove computation state which has exceeded the window length - sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp()); - sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp()); + stringSharedBuffer.release( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); + stringSharedBuffer.remove( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); newComputationStates = Collections.emptyList(); } else if (event != null) { @@ -212,8 +233,8 @@ public class NFA implements Serializable { result.addAll(matches); // remove found patterns because they are no longer needed - sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); - sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); + stringSharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); + stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); } else { // add new computation state; it will be processed once the next event arrives computationStates.add(newComputationState); @@ -230,7 +251,7 @@ public class NFA implements Serializable { // remove all elements which are expired // with respect to the window length - sharedBuffer.prune(pruningTimestamp); + stringSharedBuffer.prune(pruningTimestamp); } } @@ -244,7 +265,7 @@ public class NFA implements Serializable { NFA other = (NFA) obj; return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) && - sharedBuffer.equals(other.sharedBuffer) && + stringSharedBuffer.equals(other.stringSharedBuffer) && states.equals(other.states) && windowTime == other.windowTime; } else { @@ -254,7 +275,7 @@ public class NFA implements Serializable { @Override public int hashCode() { - return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime); + return Objects.hash(nonDuplicatingTypeSerializer, stringSharedBuffer, states, windowTime); } private static boolean isEquivalentState(final State s1, final State s2) { @@ -376,7 +397,7 @@ public class NFA implements Serializable { computationState.getStartTimestamp() ) ); - sharedBuffer.lock( + stringSharedBuffer.lock( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp()); @@ -397,14 +418,14 @@ public class NFA implements Serializable { final long startTimestamp; if (computationState.isStartState()) { startTimestamp = timestamp; - sharedBuffer.put( + stringSharedBuffer.put( consumingState.getName(), event, timestamp, currentVersion); } else { startTimestamp = computationState.getStartTimestamp(); - sharedBuffer.put( + stringSharedBuffer.put( consumingState.getName(), event, timestamp, @@ -415,7 +436,7 @@ public class NFA implements Serializable { } // a new computation state is referring to the shared entry - sharedBuffer.lock(consumingState.getName(), event, timestamp); + stringSharedBuffer.lock(consumingState.getName(), event, timestamp); resultingComputationStates.add(ComputationState.createState( newState, @@ -429,7 +450,7 @@ public class NFA implements Serializable { //check if newly created state is optional (have a PROCEED path to Final state) final State finalState = findFinalStateAfterProceed(newState, event); if (finalState != null) { - sharedBuffer.lock(consumingState.getName(), event, timestamp); + stringSharedBuffer.lock(consumingState.getName(), event, timestamp); resultingComputationStates.add(ComputationState.createState( finalState, consumingState, @@ -450,12 +471,12 @@ public class NFA implements Serializable { if (computationState.getEvent() != null) { // release the shared entry referenced by the current computation state. - sharedBuffer.release( + stringSharedBuffer.release( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp()); // try to remove unnecessary shared buffer entries - sharedBuffer.remove( + stringSharedBuffer.remove( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp()); @@ -546,7 +567,7 @@ public class NFA implements Serializable { * @return Collection of event sequences which end in the given computation state */ private Collection> extractPatternMatches(final ComputationState computationState) { - Collection> paths = sharedBuffer.extractPatterns( + Collection> paths = stringSharedBuffer.extractPatterns( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp(), @@ -592,6 +613,8 @@ public class NFA implements Serializable { nonDuplicatingTypeSerializer.clearReferences(); } + private final static String BEGINNING_STATE_NAME = "$beginningState$"; + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { ois.defaultReadObject(); @@ -599,15 +622,103 @@ public class NFA implements Serializable { computationStates = new LinkedList<>(); + final List> readComputationStates = new ArrayList<>(numberComputationStates); + + boolean afterMigration = false; for (int i = 0; i < numberComputationStates; i++) { ComputationState computationState = readComputationState(ois); + if (computationState.getState().getName().equals(BEGINNING_STATE_NAME)) { + afterMigration = true; + } - computationStates.offer(computationState); + readComputationStates.add(computationState); + } + + if (afterMigration && !readComputationStates.isEmpty()) { + try { + //Backwards compatibility + this.computationStates.addAll(migrateNFA(readComputationStates)); + final Field newSharedBufferField = NFA.class.getDeclaredField("stringSharedBuffer"); + final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer"); + sharedBufferField.setAccessible(true); + newSharedBufferField.setAccessible(true); + newSharedBufferField.set(this, SharedBuffer.migrateSharedBuffer(this.sharedBuffer)); + sharedBufferField.set(this, null); + sharedBufferField.setAccessible(false); + newSharedBufferField.setAccessible(false); + } catch (Exception e) { + throw new IllegalStateException("Could not migrate from earlier version", e); + } + } else { + this.computationStates.addAll(readComputationStates); } nonDuplicatingTypeSerializer.clearReferences(); } + /** + * Needed for backward compatibility. First migrates the {@link State} graph see {@link NFACompiler#migrateGraph(State)}. + * Than recreates the {@link ComputationState}s with the new {@link State} graph. + * @param readStates computation states read from snapshot + * @return collection of migrated computation states + */ + private Collection> migrateNFA(Collection> readStates) { + final ArrayList> computationStates = new ArrayList<>(); + + final State startState = Iterators.find( + readStates.iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable ComputationState input) { + return input != null && input.getState().getName().equals(BEGINNING_STATE_NAME); + } + }).getState(); + + final Map> convertedStates = NFACompiler.migrateGraph(startState); + + for (ComputationState readState : readStates) { + if (!readState.isStartState()) { + final String previousName = readState.getState().getName(); + final String currentName = Iterators.find( + readState.getState().getStateTransitions().iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable StateTransition input) { + return input != null && input.getAction() == StateTransitionAction.TAKE; + } + }).getTargetState().getName(); + + + final State previousState = convertedStates.get(previousName); + + computationStates.add(ComputationState.createState( + convertedStates.get(currentName), + previousState, + readState.getEvent(), + readState.getTimestamp(), + readState.getVersion(), + readState.getStartTimestamp() + )); + } + } + + final String startName = Iterators.find(convertedStates.values().iterator(), new Predicate>() { + @Override + public boolean apply(@Nullable State input) { + return input != null && input.isStart(); + } + }).getName(); + + computationStates.add(ComputationState.createStartState( + convertedStates.get(startName), + new DeweyNumber(this.startEventCounter))); + + this.states.clear(); + this.states.addAll(convertedStates.values()); + + return computationStates; + } + private void writeComputationState(final ComputationState computationState, final ObjectOutputStream oos) throws IOException { oos.writeObject(computationState.getState()); oos.writeObject(computationState.getPreviousState()); @@ -629,7 +740,13 @@ public class NFA implements Serializable { @SuppressWarnings("unchecked") private ComputationState readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException { final State state = (State)ois.readObject(); - final State previousState = (State)ois.readObject(); + State previousState; + try { + previousState = (State)ois.readObject(); + } catch (OptionalDataException e) { + previousState = null; + } + final long timestamp = ois.readLong(); final DeweyNumber version = (DeweyNumber)ois.readObject(); final long startTimestamp = ois.readLong(); @@ -647,6 +764,7 @@ public class NFA implements Serializable { return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp); } + /** * Generates a state name from a given name template and an index. *

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index e6a8c75..d5b7876 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.LinkedHashMultimap; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -463,6 +464,54 @@ public class SharedBuffer implements Serializable { } } + private SharedBuffer( + TypeSerializer valueSerializer, + Map> pages) { + this.valueSerializer = valueSerializer; + this.pages = pages; + } + + /** + * For backward compatibility only. Previously the key in {@link SharedBuffer} was {@link State}. + * Now it is {@link String}. + */ + @Internal + static SharedBuffer migrateSharedBuffer(SharedBuffer, T> buffer) { + + final Map> pageMap = new HashMap<>(); + final Map, T>, SharedBufferEntry> entries = new HashMap<>(); + + for (Map.Entry, SharedBufferPage, T>> page : buffer.pages.entrySet()) { + final SharedBufferPage newPage = new SharedBufferPage<>(page.getKey().getName()); + pageMap.put(newPage.getKey(), newPage); + + for (Map.Entry, SharedBufferEntry, T>> pageEntry : page.getValue().entries.entrySet()) { + final SharedBufferEntry newSharedBufferEntry = new SharedBufferEntry<>( + pageEntry.getKey(), + newPage); + newSharedBufferEntry.referenceCounter = pageEntry.getValue().referenceCounter; + entries.put(pageEntry.getValue(), newSharedBufferEntry); + newPage.entries.put(pageEntry.getKey(), newSharedBufferEntry); + } + } + + for (Map.Entry, SharedBufferPage, T>> page : buffer.pages.entrySet()) { + for (Map.Entry, SharedBufferEntry, T>> pageEntry : page.getValue().entries.entrySet()) { + final SharedBufferEntry newEntry = entries.get(pageEntry.getValue()); + for (SharedBufferEdge, T> edge : pageEntry.getValue().edges) { + final SharedBufferEntry targetNewEntry = entries.get(edge.getTarget()); + + final SharedBufferEdge newEdge = new SharedBufferEdge<>( + targetNewEntry, + edge.getVersion()); + newEntry.edges.add(newEdge); + } + } + } + + return new SharedBuffer<>(buffer.valueSerializer, pageMap); + } + private SharedBufferEntry get( final K key, final V value, http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java index 7bcb6ea..27e0dcd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java @@ -20,9 +20,12 @@ package org.apache.flink.cep.nfa; import org.apache.flink.api.common.functions.FilterFunction; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Objects; /** @@ -62,7 +65,6 @@ public class State implements Serializable { return stateTransitions; } - private void addStateTransition( final StateTransitionAction action, final State targetState, @@ -132,4 +134,19 @@ public class State implements Serializable { Final, // the state is a final state for the NFA Normal // the state is neither a start nor a final state } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + ois.defaultReadObject(); + + //Backward compatibility. Previous version of StateTransition did not have source state + if (!stateTransitions.isEmpty() && stateTransitions.iterator().next().getSourceState() == null) { + final List> tmp = new ArrayList<>(); + tmp.addAll(this.stateTransitions); + + this.stateTransitions.clear(); + for (StateTransition transition : tmp) { + addStateTransition(transition.getAction(), transition.getTargetState(), transition.getCondition()); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index b476c49..8bd8612 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -18,10 +18,15 @@ package org.apache.flink.cep.nfa.compiler; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.State; +import org.apache.flink.cep.nfa.StateTransition; +import org.apache.flink.cep.nfa.StateTransitionAction; import org.apache.flink.cep.pattern.FilterFunctions; import org.apache.flink.cep.pattern.FollowedByPattern; import org.apache.flink.cep.pattern.MalformedPatternException; @@ -31,12 +36,15 @@ import org.apache.flink.cep.pattern.Quantifier; import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty; import org.apache.flink.streaming.api.windowing.time.Time; +import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -363,6 +371,114 @@ public class NFACompiler { } /** + * Used for migrating CEP graphs prior to 1.3. It removes the dummy start, adds the dummy end, and translates all + * states to consuming ones by moving all TAKEs and IGNOREs to the next state. This method assumes each state + * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed! + * + * @param oldStartState dummy start state of old graph + * @param type of events + * @return map of new states, where key is the name of a state and value is the state itself + */ + @Internal + public static Map> migrateGraph(State oldStartState) { + State oldFirst = oldStartState; + State oldSecond = oldStartState.getStateTransitions().iterator().next().getTargetState(); + + StateTransition oldFirstToSecondTake = Iterators.find( + oldFirst.getStateTransitions().iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable StateTransition input) { + return input != null && input.getAction() == StateTransitionAction.TAKE; + } + + }); + + StateTransition oldFirstIgnore = Iterators.find( + oldFirst.getStateTransitions().iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable StateTransition input) { + return input != null && input.getAction() == StateTransitionAction.IGNORE; + } + + }, null); + + StateTransition oldSecondToThirdTake = Iterators.find( + oldSecond.getStateTransitions().iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable StateTransition input) { + return input != null && input.getAction() == StateTransitionAction.TAKE; + } + + }, null); + + final Map> convertedStates = new HashMap<>(); + State newSecond; + State newFirst = new State<>(oldSecond.getName(), State.StateType.Start); + convertedStates.put(newFirst.getName(), newFirst); + while (oldSecondToThirdTake != null) { + + newSecond = new State(oldSecondToThirdTake.getTargetState().getName(), State.StateType.Normal); + convertedStates.put(newSecond.getName(), newSecond); + newFirst.addTake(newSecond, oldFirstToSecondTake.getCondition()); + + if (oldFirstIgnore != null) { + newFirst.addIgnore(oldFirstIgnore.getCondition()); + } + + oldFirst = oldSecond; + + oldFirstToSecondTake = Iterators.find( + oldFirst.getStateTransitions().iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable StateTransition input) { + return input != null && input.getAction() == StateTransitionAction.TAKE; + } + + }); + + oldFirstIgnore = Iterators.find( + oldFirst.getStateTransitions().iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable StateTransition input) { + return input != null && input.getAction() == StateTransitionAction.IGNORE; + } + + }, null); + + oldSecond = oldSecondToThirdTake.getTargetState(); + + oldSecondToThirdTake = Iterators.find( + oldSecond.getStateTransitions().iterator(), + new Predicate>() { + @Override + public boolean apply(@Nullable StateTransition input) { + return input != null && input.getAction() == StateTransitionAction.TAKE; + } + + }, null); + + newFirst = newSecond; + } + + final State endingState = new State<>(ENDING_STATE_NAME, State.StateType.Final); + + newFirst.addTake(endingState, oldFirstToSecondTake.getCondition()); + + if (oldFirstIgnore != null) { + newFirst.addIgnore(oldFirstIgnore.getCondition()); + } + + convertedStates.put(endingState.getName(), endingState); + + return convertedStates; + } + + /** * Factory interface for {@link NFA}. * * @param Type of the input events which are processed by the NFA http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 825ba957..5b05f19 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class NFAITCase extends TestLogger { @@ -421,7 +422,7 @@ public class NFAITCase extends TestLogger { } @Test - public void testComplexBranchingAfterKleeneStar() { + public void testComplexBranchingAfterZeroOrMore() { List> inputEvents = new ArrayList<>(); Event startEvent = new Event(40, "c", 1.0); @@ -519,7 +520,7 @@ public class NFAITCase extends TestLogger { } @Test - public void testKleeneStar() { + public void testZeroOrMore() { List> inputEvents = new ArrayList<>(); Event startEvent = new Event(40, "c", 1.0); @@ -581,7 +582,7 @@ public class NFAITCase extends TestLogger { } @Test - public void testEagerKleeneStar() { + public void testEagerZeroOrMore() { List> inputEvents = new ArrayList<>(); Event startEvent = new Event(40, "c", 1.0); @@ -646,7 +647,7 @@ public class NFAITCase extends TestLogger { @Test - public void testBeginWithKleeneStar() { + public void testBeginWithZeroOrMore() { List> inputEvents = new ArrayList<>(); Event middleEvent1 = new Event(40, "a", 2.0); @@ -704,7 +705,7 @@ public class NFAITCase extends TestLogger { } @Test - public void testKleeneStarAfterKleeneStar() { + public void testZeroOrMoreAfterZeroOrMore() { List> inputEvents = new ArrayList<>(); Event startEvent = new Event(40, "c", 1.0); @@ -779,7 +780,7 @@ public class NFAITCase extends TestLogger { } @Test - public void testKleeneStarAfterBranching() { + public void testZeroOrMoreAfterBranching() { List> inputEvents = new ArrayList<>(); Event startEvent = new Event(40, "c", 1.0); @@ -865,7 +866,7 @@ public class NFAITCase extends TestLogger { } @Test - public void testStrictContinuityNoResultsAfterKleeneStar() { + public void testStrictContinuityNoResultsAfterZeroOrMore() { List> inputEvents = new ArrayList<>(); Event start = new Event(40, "d", 2.0); @@ -923,7 +924,7 @@ public class NFAITCase extends TestLogger { } @Test - public void testStrictContinuityResultsAfterKleeneStar() { + public void testStrictContinuityResultsAfterZeroOrMore() { List> inputEvents = new ArrayList<>(); Event start = new Event(40, "d", 2.0); @@ -1663,4 +1664,181 @@ public class NFAITCase extends TestLogger { ), resultingPatterns); } + /** + * Clearing SharedBuffer + */ + + @Test + public void testTimesClearingBuffer() { + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).followedBy("end1").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).within(Time.milliseconds(8)); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + nfa.process(startEvent, 1); + nfa.process(middleEvent1, 2); + nfa.process(middleEvent2, 3); + nfa.process(middleEvent3, 4); + nfa.process(end1, 6); + + //pruning element + nfa.process(null, 10); + + assertEquals(true, nfa.isEmpty()); + } + + @Test + public void testOptionalClearingBuffer() { + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).optional().followedBy("end1").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).within(Time.milliseconds(8)); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + nfa.process(startEvent, 1); + nfa.process(middleEvent, 5); + nfa.process(end1, 6); + + //pruning element + nfa.process(null, 10); + + assertEquals(true, nfa.isEmpty()); + } + + @Test + public void testAtLeastOneClearingBuffer() { + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event end1 = new Event(44, "b", 5.0); + + Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore(false).followedBy("end1").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).within(Time.milliseconds(8)); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + nfa.process(startEvent, 1); + nfa.process(middleEvent1, 3); + nfa.process(middleEvent2, 4); + nfa.process(end1, 6); + + //pruning element + nfa.process(null, 10); + + assertEquals(true, nfa.isEmpty()); + } + + + @Test + public void testZeroOrMoreClearingBuffer() { + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event end1 = new Event(44, "b", 5.0); + + Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(false).followedBy("end1").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).within(Time.milliseconds(8)); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + nfa.process(startEvent, 1); + nfa.process(middleEvent1, 3); + nfa.process(middleEvent2, 4); + nfa.process(end1, 6); + + //pruning element + nfa.process(null, 10); + + assertEquals(true, nfa.isEmpty()); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java new file mode 100644 index 0000000..65fa733 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.ObjectInputStream; +import java.net.URL; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CEPMigration12to13Test { + + private static String getResourceFilename(String filename) { + ClassLoader cl = CEPMigration12to13Test.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Test + public void testMigrationAfterBranchingPattern() throws Exception { + + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.2 +// OneInputStreamOperatorTestHarness> harness = +// new KeyedOneInputStreamOperatorTestHarness<>( +// new KeyedCEPPatternOperator<>( +// Event.createTypeSerializer(), +// false, +// keySelector, +// IntSerializer.INSTANCE, +// new NFAFactory(), +// true), +// keySelector, +// BasicTypeInfo.INT_TYPE_INFO); +// +// harness.setup(); +// harness.open(); +// harness.processElement(new StreamRecord(startEvent, 1)); +// harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); +// harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); +// harness.processElement(new StreamRecord(middleEvent1, 2)); +// harness.processElement(new StreamRecord(middleEvent2, 3)); +// harness.processWatermark(new Watermark(5)); +// // simulate snapshot/restore with empty element queue but NFA state +// OperatorStateHandles snapshot = harness.snapshot(1, 1); +// FileOutputStream out = new FileOutputStream( +// "src/test/resources/cep-branching-snapshot-1.2"); +// ObjectOutputStream oos = new ObjectOutputStream(out); +// oos.writeObject(snapshot.getOperatorChainIndex()); +// oos.writeObject(snapshot.getLegacyOperatorState()); +// oos.writeObject(snapshot.getManagedKeyedState()); +// oos.writeObject(snapshot.getRawKeyedState()); +// oos.writeObject(snapshot.getManagedOperatorState()); +// oos.writeObject(snapshot.getRawOperatorState()); +// out.close(); +// harness.close(); + + OneInputStreamOperatorTestHarness> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename( + "cep-branching-snapshot-1.2"))); + final OperatorStateHandles snapshot = new OperatorStateHandles( + (int) ois.readObject(), + (StreamStateHandle) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject() + ); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord<>(endEvent, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue result = harness.getOutput(); + + // watermark and 2 results + assertEquals(3, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord resultRecord1 = (StreamRecord) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + Object resultObject2 = result.poll(); + assertTrue(resultObject2 instanceof StreamRecord); + StreamRecord resultRecord2 = (StreamRecord) resultObject2; + assertTrue(resultRecord2.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap1 = (Map) resultRecord1.getValue(); + + assertEquals(startEvent, patternMap1.get("start")); + assertEquals(middleEvent1, patternMap1.get("middle")); + assertEquals(endEvent, patternMap1.get("end")); + + @SuppressWarnings("unchecked") + Map patternMap2 = (Map) resultRecord2.getValue(); + + assertEquals(startEvent, patternMap2.get("start")); + assertEquals(middleEvent2, patternMap2.get("middle")); + assertEquals(endEvent, patternMap2.get("end")); + + harness.close(); + } + + @Test + public void testStartingNewPatternAfterMigration() throws Exception { + + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final Event startEvent2 = new Event(42, "start", 5.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.2 +// OneInputStreamOperatorTestHarness> harness = +// new KeyedOneInputStreamOperatorTestHarness<>( +// new KeyedCEPPatternOperator<>( +// Event.createTypeSerializer(), +// false, +// keySelector, +// IntSerializer.INSTANCE, +// new NFAFactory(), +// true), +// keySelector, +// BasicTypeInfo.INT_TYPE_INFO); +// +// harness.setup(); +// harness.open(); +// harness.processElement(new StreamRecord(startEvent1, 1)); +// harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); +// harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); +// harness.processElement(new StreamRecord(middleEvent1, 2)); +// harness.processWatermark(new Watermark(5)); +// // simulate snapshot/restore with empty element queue but NFA state +// OperatorStateHandles snapshot = harness.snapshot(1, 1); +// FileOutputStream out = new FileOutputStream( +// "src/test/resources/cep-starting-snapshot-1.2"); +// ObjectOutputStream oos = new ObjectOutputStream(out); +// oos.writeObject(snapshot.getOperatorChainIndex()); +// oos.writeObject(snapshot.getLegacyOperatorState()); +// oos.writeObject(snapshot.getManagedKeyedState()); +// oos.writeObject(snapshot.getRawKeyedState()); +// oos.writeObject(snapshot.getManagedOperatorState()); +// oos.writeObject(snapshot.getRawOperatorState()); +// out.close(); +// harness.close(); + + OneInputStreamOperatorTestHarness> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename( + "cep-starting-snapshot-1.2"))); + final OperatorStateHandles snapshot = new OperatorStateHandles( + (int) ois.readObject(), + (StreamStateHandle) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject() + ); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(startEvent2, 5)); + harness.processElement(new StreamRecord(middleEvent2, 6)); + harness.processElement(new StreamRecord<>(endEvent, 7)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue result = harness.getOutput(); + + // watermark and 3 results + assertEquals(4, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord resultRecord1 = (StreamRecord) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + Object resultObject2 = result.poll(); + assertTrue(resultObject2 instanceof StreamRecord); + StreamRecord resultRecord2 = (StreamRecord) resultObject2; + assertTrue(resultRecord2.getValue() instanceof Map); + + Object resultObject3 = result.poll(); + assertTrue(resultObject3 instanceof StreamRecord); + StreamRecord resultRecord3 = (StreamRecord) resultObject3; + assertTrue(resultRecord3.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap1 = (Map) resultRecord1.getValue(); + + assertEquals(startEvent1, patternMap1.get("start")); + assertEquals(middleEvent1, patternMap1.get("middle")); + assertEquals(endEvent, patternMap1.get("end")); + + @SuppressWarnings("unchecked") + Map patternMap2 = (Map) resultRecord2.getValue(); + + assertEquals(startEvent1, patternMap2.get("start")); + assertEquals(middleEvent2, patternMap2.get("middle")); + assertEquals(endEvent, patternMap2.get("end")); + + @SuppressWarnings("unchecked") + Map patternMap3 = (Map) resultRecord3.getValue(); + + assertEquals(startEvent2, patternMap3.get("start")); + assertEquals(middleEvent2, patternMap3.get("middle")); + assertEquals(endEvent, patternMap3.get("end")); + + harness.close(); + } + + @Test + public void testSinglePatternAfterMigration() throws Exception { + + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.2 +// OneInputStreamOperatorTestHarness> harness = +// new KeyedOneInputStreamOperatorTestHarness<>( +// new KeyedCEPPatternOperator<>( +// Event.createTypeSerializer(), +// false, +// keySelector, +// IntSerializer.INSTANCE, +// new SinglePatternNFAFactory(), +// true), +// keySelector, +// BasicTypeInfo.INT_TYPE_INFO); +// +// harness.setup(); +// harness.open(); +// harness.processWatermark(new Watermark(5)); +// // simulate snapshot/restore with empty element queue but NFA state +// OperatorStateHandles snapshot = harness.snapshot(1, 1); +// FileOutputStream out = new FileOutputStream( +// "src/test/resources/cep-single-pattern-snapshot-1.2"); +// ObjectOutputStream oos = new ObjectOutputStream(out); +// oos.writeObject(snapshot.getOperatorChainIndex()); +// oos.writeObject(snapshot.getLegacyOperatorState()); +// oos.writeObject(snapshot.getManagedKeyedState()); +// oos.writeObject(snapshot.getRawKeyedState()); +// oos.writeObject(snapshot.getManagedOperatorState()); +// oos.writeObject(snapshot.getRawOperatorState()); +// out.close(); +// harness.close(); + + OneInputStreamOperatorTestHarness> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new SinglePatternNFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename( + "cep-single-pattern-snapshot-1.2"))); + final OperatorStateHandles snapshot = new OperatorStateHandles( + (int) ois.readObject(), + (StreamStateHandle) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject() + ); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(startEvent1, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord resultRecord = (StreamRecord) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap = (Map) resultRecord.getValue(); + + assertEquals(startEvent1, patternMap.get("start")); + + harness.close(); + } + + private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private SinglePatternNFAFactory() { + this(false); + } + + private SinglePatternNFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA createNFA() { + + Pattern pattern = Pattern.begin("start").where(new StartFilter()) + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + private static class NFAFactory implements NFACompiler.NFAFactory { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private NFAFactory() { + this(false); + } + + private NFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA createNFA() { + + Pattern pattern = Pattern.begin("start").where(new StartFilter()) + .followedBy("middle") + .subtype(SubEvent.class) + .where(new MiddleFilter()) + .followedBy("end") + .where(new EndFilter()) + // add a window timeout to test whether timestamps of elements in the + // priority queue in CEP operator are correctly checkpointed/restored + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + private static class StartFilter implements FilterFunction { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + } + + private static class MiddleFilter implements FilterFunction { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + } + + private static class EndFilter implements FilterFunction { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 new file mode 100644 index 0000000..47f710e Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 differ http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 new file mode 100644 index 0000000..255f46a Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 differ http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 new file mode 100644 index 0000000..c41f6c2 Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 differ