flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl0u <...@git.apache.org>
Subject [GitHub] flink pull request #3477: [Flink-3318][cep] Add support for quantifiers to C...
Date Mon, 13 Mar 2017 16:27:22 GMT
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3477#discussion_r105700356
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -247,27 +275,148 @@ public int compare(final StateTransition<T> o1, final StateTransition<T>
o2) {
     	 * @return Collection of computation states which result from the current one
     	 */
     	private Collection<ComputationState<T>> computeNextStates(
    -			final ComputationState<T> computationState,
    -			final T event,
    -			final long timestamp) {
    -		Stack<State<T>> states = new Stack<>();
    -		ArrayList<ComputationState<T>> resultingComputationStates = new ArrayList<>();
    -		State<T> state = computationState.getState();
    +		final ComputationState<T> computationState,
    +		final T event,
    +		final long timestamp) {
    +		final ArrayList<ComputationState<T>> resultingComputationStates = new ArrayList<>();
    +
    +		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState,
event);
    +
    +		// Create the computing version based on the previously computed edges
    +		// We need to defer the creation of computation states until we know how many edges
start
    +		// at this computation state so that we can assign proper version
    +		final List<StateTransition<T>> edges = outgoingEdges.getEdges();
    +		Integer takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1);
    +		Integer ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
    +		for (StateTransition<T> edge : edges) {
    +			switch (edge.getAction()) {
    +				case IGNORE: {
    +					if (!computationState.isStartState()) {
    +						final DeweyNumber version;
    +						if (!isEquivalentState(edge.getTargetState(), computationState.getState())) {
    +							version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
    +							ignoreBranchesToVisit--;
    +						} else {
    +							final int toIncrease = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
    +								outgoingEdges.getTotalTakeBranches());
    +							version = computationState.getVersion().increase(toIncrease);
    +						}
    +
    +						resultingComputationStates.add(
    +							ComputationState.createState(
    +								edge.getTargetState(),
    +								computationState.getPreviousState(),
    +								computationState.getEvent(),
    +								computationState.getTimestamp(),
    +								version,
    +								computationState.getStartTimestamp()
    +							)
    +						);
    +						sharedBuffer.lock(
    +							edge.getTargetState().getName(),
    +							computationState.getEvent(),
    +							computationState.getTimestamp());
    +					}
    +				}
    +				break;
    +				case TAKE:
    +					final State<T> newState = edge.getTargetState();
    +					final State<T> consumingState = edge.getSourceState();
    +					final State<T> previousEventState = computationState.getPreviousState();
    +
    +					final T previousEvent = computationState.getEvent();
    +					final DeweyNumber currentVersion = computationState.getVersion();
    +
    +					final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
    +					takeBranchesToVisit--;
    +
    +					final long startTimestamp;
    +					if (computationState.isStartState()) {
    +						startTimestamp = timestamp;
    +						sharedBuffer.put(
    +							consumingState.getName(),
    +							event,
    +							timestamp,
    +							currentVersion);
    +					} else {
    +						startTimestamp = computationState.getStartTimestamp();
    +						sharedBuffer.put(
    +							consumingState.getName(),
    +							event,
    +							timestamp,
    +							previousEventState.getName(),
    +							previousEvent,
    +							computationState.getTimestamp(),
    +							currentVersion);
    +					}
    +
    +					// a new computation state is referring to the shared entry
    +					sharedBuffer.lock(consumingState.getName(), event, timestamp);
    +
    +					resultingComputationStates.add(ComputationState.createState(
    +						newState,
    +						consumingState,
    +						event,
    +						timestamp,
    +						newComputationStateVersion,
    +						startTimestamp
    +					));
    +					break;
    +			}
    +		}
     
    -		states.push(state);
    +		if (computationState.isStartState()) {
    +			final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
outgoingEdges.getTotalTakeBranches());
    +			final ComputationState<T> startState = createStartState(computationState, totalBranches);
    +			resultingComputationStates.add(startState);
    +		}
    +
    +		if (computationState.getEvent() != null) {
    +			// release the shared entry referenced by the current computation state.
    +			sharedBuffer.release(
    +				computationState.getState().getName(),
    +				computationState.getEvent(),
    +				computationState.getTimestamp());
    +			// try to remove unnecessary shared buffer entries
    +			sharedBuffer.remove(
    +				computationState.getState().getName(),
    +				computationState.getEvent(),
    +				computationState.getTimestamp());
    +		}
    +
    +		return resultingComputationStates;
    +	}
    +
    +	private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
    +		if (takeBranches == 0 && ignoreBranches == 0) {
    --- End diff --
    
    This can become: `return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches
+ 1;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message