flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dawidwys <...@git.apache.org>
Subject [GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...
Date Wed, 15 Mar 2017 13:41:26 GMT
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3477#discussion_r106163471
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
    @@ -74,88 +76,233 @@
     	 */
     	@SuppressWarnings("unchecked")
     	public static <T> NFAFactory<T> compileFactory(
    -		Pattern<T, ?> pattern,
    -		TypeSerializer<T> inputTypeSerializer,
    +		final Pattern<T, ?> pattern,
    +		final TypeSerializer<T> inputTypeSerializer,
     		boolean timeoutHandling) {
     		if (pattern == null) {
     			// return a factory for empty NFAs
    -			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(),
timeoutHandling);
    +			return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(),
timeoutHandling);
     		} else {
    -			// set of all generated states
    -			Map<String, State<T>> states = new HashMap<>();
    -			long windowTime;
    +			final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
    +			nfaFactoryCompiler.compileFactory();
    +			return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(),
nfaFactoryCompiler.getStates(), timeoutHandling);
    +		}
    +	}
     
    -			// this is used to enforse pattern name uniqueness.
    -			Set<String> patternNames = new HashSet<>();
    +	private static class NFAFactoryCompiler<T> {
     
    -			Pattern<T, ?> succeedingPattern;
    -			State<T> succeedingState;
    -			Pattern<T, ?> currentPattern = pattern;
    +		private final Set<String> usedNames = new HashSet<>();
    +		private final List<State<T>> states = new ArrayList<>();
     
    +		private long windowTime = 0;
    +		private Pattern<T, ?> currentPattern;
    +
    +		NFAFactoryCompiler(final Pattern<T, ?> pattern) {
    +			this.currentPattern = pattern;
    +		}
    +
    +		/**
    +		 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used
to create
    +		 * multiple NFAs.
    +		 */
    +		void compileFactory() {
     			// we're traversing the pattern from the end to the beginning --> the first state
is the final state
    -			State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
    -			patternNames.add(currentPattern.getName());
    +			State<T> sinkState = createEndingState();
    +			// add all the normal states
    +			sinkState = createMiddleStates(sinkState);
    +			// add the beginning state
    +			createStartState(sinkState);
    +		}
    +
    +		List<State<T>> getStates() {
    +			return states;
    +		}
     
    -			states.put(currentPattern.getName(), currentState);
    +		long getWindowTime() {
    +			return windowTime;
    +		}
    +
    +		private State<T> createEndingState() {
    +			State<T> sinkState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
    +			states.add(sinkState);
    +			usedNames.add(ENDING_STATE_NAME);
     
     			windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds()
: 0L;
    +			return sinkState;
    +		}
     
    -			while (currentPattern.getPrevious() != null) {
    -				succeedingPattern = currentPattern;
    -				succeedingState = currentState;
    -				currentPattern = currentPattern.getPrevious();
    +		private State<T> createMiddleStates(final State<T> sinkState) {
     
    -				if (!patternNames.add(currentPattern.getName())) {
    -					throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName()
+ ". " +
    -						"Pattern names must be unique.");
    +			State<T> lastSink = sinkState;
    +			while (currentPattern.getPrevious() != null) {
    +				State<T> sourceState;
    +
    +				checkPatternNameUniqueness();
    +
    +				sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);
    +				states.add(sourceState);
    +				usedNames.add(sourceState.getName());
    +
    +				if (currentPattern.getQuantifier().isLooping()) {
    +					convertToLooping(lastSink, sourceState);
    +
    +					if (currentPattern.getQuantifier().isAtLeastOne()) {
    +						sourceState = createFirstMandatoryStateOfLoop(
    +							sourceState,
    +							State.StateType.Normal
    +						);
    +						states.add(sourceState);
    +						usedNames.add(sourceState.getName());
    +					}
    +				} else if (currentPattern.getQuantifier() == Quantifier.TIMES) {
    +					sourceState = convertToTimesState(lastSink, sourceState, currentPattern.getTimes());
    +				} else {
    +					convertToSingletonState(
    +						lastSink,
    +						sourceState);
     				}
     
    -				Time currentWindowTime = currentPattern.getWindowTime();
    +				currentPattern = currentPattern.getPrevious();
    +				lastSink = sourceState;
     
    +				final Time currentWindowTime = currentPattern.getWindowTime();
     				if (currentWindowTime != null && currentWindowTime.toMilliseconds() <
windowTime) {
     					// the window time is the global minimum of all window times of each state
     					windowTime = currentWindowTime.toMilliseconds();
     				}
    +			}
    +
    +			return lastSink;
    +		}
     
    -				if (states.containsKey(currentPattern.getName())) {
    -					currentState = states.get(currentPattern.getName());
    +		private void checkPatternNameUniqueness() {
    +			if (usedNames.contains(currentPattern.getName())) {
    +				throw new MalformedPatternException(
    +					"Duplicate pattern name: " + currentPattern.getName() + ". " +
    +					"Pattern names must be unique.");
    +			}
    +		}
    +
    +		@SuppressWarnings("unchecked")
    +		private State<T> createStartState(State<T> sinkState) {
    +			final State<T> beginningState;
    +
    +			checkPatternNameUniqueness();
    +
    +			if (currentPattern.getQuantifier().isLooping()) {
    +				final State<T> loopingState;
    +				if (currentPattern.getQuantifier().isAtLeastOne()) {
    +					loopingState = new State<>(currentPattern.getName(), State.StateType.Normal);
    +					beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
    +					states.add(loopingState);
     				} else {
    -					currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
    -					states.put(currentState.getName(), currentState);
    +					loopingState = new State<>(currentPattern.getName(), State.StateType.Start);
    +					beginningState = loopingState;
     				}
    -
    -				currentState.addStateTransition(new StateTransition<T>(
    -					StateTransitionAction.TAKE,
    -					succeedingState,
    -					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    -
    -				if (succeedingPattern instanceof FollowedByPattern) {
    -					// the followed by pattern entails a reflexive ignore transition
    -					currentState.addStateTransition(new StateTransition<T>(
    -						StateTransitionAction.IGNORE,
    -						currentState,
    -						null
    -					));
    +				convertToLooping(sinkState, loopingState);
    +			} else if (currentPattern.getQuantifier() == Quantifier.TIMES) {
    +				if (currentPattern.getTimes() > 1) {
    +					final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal);
    +					states.add(timesState);
    +					sinkState = convertToTimesState(sinkState, timesState, currentPattern.getTimes()
- 1);
     				}
    +				beginningState = new State<>(currentPattern.getName(), State.StateType.Start);
    +				beginningState.addTake(sinkState, (FilterFunction<T>) currentPattern.getFilterFunction());
    +			} else {
    +				beginningState = new State<>(currentPattern.getName(), State.StateType.Start);
    +				beginningState.addTake(sinkState, (FilterFunction<T>) currentPattern.getFilterFunction());
     			}
     
    -			// add the beginning state
    -			final State<T> beginningState;
    +			states.add(beginningState);
    +			usedNames.add(beginningState.getName());
    +
    +			return beginningState;
    +		}
    +
    +		private State<T> convertToTimesState(State<T> sinkState, State<T>
sourceState, int times) {
    +			convertToSingletonState(sinkState, sourceState);
    +			for (int i = 0; i < times - 1; i++) {
    +				sinkState = sourceState;
    +				sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);
    +				states.add(sourceState);
    +				convertToSingletonState(sinkState, sourceState);
    +			}
    +			return sourceState;
    +		}
    +
    +		@SuppressWarnings("unchecked")
    +		private void convertToSingletonState(
    +			final State<T> sinkState,
    --- End diff --
    
    I've done it the other way round as to handle optional in one place. But I think your
version is easier to analyze.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message