flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
Date Wed, 27 Jun 2018 12:31:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524983#comment-16524983
] 

ASF GitHub Bot commented on FLINK-9593:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6171#discussion_r198473858
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -330,77 +328,85 @@ private boolean isStateTimedOut(final ComputationState state, final
long timesta
     			}
     		}
     
    -		discardComputationStatesAccordingToStrategy(
    -			sharedBuffer, computationStates, result, afterMatchSkipStrategy);
    +		if (!potentialMatches.isEmpty()) {
    +			nfaState.setStateChanged();
    +		}
    +
    +		List<Map<String, List<T>>> result = new ArrayList<>();
    +		if (afterMatchSkipStrategy.isSkipStrategy()) {
    +			processMatchesAccordingToSkipStrategy(sharedBuffer,
    +				nfaState,
    +				afterMatchSkipStrategy,
    +				potentialMatches,
    +				result);
    +		} else {
    +			for (ComputationState match : potentialMatches) {
    +				result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(),
    +					match.getVersion()).get(0)));
    +				sharedBuffer.releaseNode(match.getPreviousBufferEntry());
    +			}
    +		}
     
     		return result;
     	}
     
    -	private void discardComputationStatesAccordingToStrategy(
    -			final SharedBuffer<T> sharedBuffer,
    -			final Queue<ComputationState> computationStates,
    -			final Collection<Map<String, List<T>>> matchedResult,
    -			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
    +	private void processMatchesAccordingToSkipStrategy(
    +			SharedBuffer<T> sharedBuffer,
    +			NFAState nfaState,
    +			AfterMatchSkipStrategy afterMatchSkipStrategy,
    +			PriorityQueue<ComputationState> potentialMatches,
    +			List<Map<String, List<T>>> result) throws Exception {
     
    -		Set<T> discardEvents = new HashSet<>();
    -		switch(afterMatchSkipStrategy.getStrategy()) {
    -			case SKIP_TO_LAST:
    -				for (Map<String, List<T>> resultMap: matchedResult) {
    -					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
    -						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    -							discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size()
- 1));
    -							break;
    -						} else {
    -							discardEvents.addAll(keyMatches.getValue());
    -						}
    -					}
    -				}
    -				break;
    -			case SKIP_TO_FIRST:
    -				for (Map<String, List<T>> resultMap: matchedResult) {
    -					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
    -						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    -							break;
    -						} else {
    -							discardEvents.addAll(keyMatches.getValue());
    -						}
    -					}
    -				}
    -				break;
    -			case SKIP_PAST_LAST_EVENT:
    -				for (Map<String, List<T>> resultMap: matchedResult) {
    -					for (List<T> eventList: resultMap.values()) {
    -						discardEvents.addAll(eventList);
    -					}
    -				}
    -				break;
    -		}
    -		if (!discardEvents.isEmpty()) {
    -			List<ComputationState> discardStates = new ArrayList<>();
    -			for (ComputationState computationState : computationStates) {
    -				boolean discard = false;
    -				Map<String, List<T>> partialMatch = extractCurrentMatches(sharedBuffer,
computationState);
    -				for (List<T> list: partialMatch.values()) {
    -					for (T e: list) {
    -						if (discardEvents.contains(e)) {
    -							// discard the computation state.
    -							discard = true;
    -							break;
    -						}
    -					}
    -					if (discard) {
    -						break;
    -					}
    -				}
    -				if (discard) {
    -					sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
    -					discardStates.add(computationState);
    -				}
    +		nfaState.getCompletedMatches().addAll(potentialMatches);
    +
    +		ComputationState earliestMatch = nfaState.getCompletedMatches().peek();
    +
    +		if (earliestMatch != null) {
    +			Queue<ComputationState> sortedPartialMatches = sortByStartTime(nfaState.getPartialMatches());
    --- End diff --
    
    Instead of sorting every time, why not keeping the partial matches in a priority queue?


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---------------------------------------------------
>
>                 Key: FLINK-9593
>                 URL: https://issues.apache.org/jira/browse/FLINK-9593
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message