flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smarthi <...@git.apache.org>
Subject [GitHub] flink pull request #2367: [FLINK-3703][cep] Add sequence matching semantics ...
Date Mon, 15 Aug 2016 00:05:59 GMT
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2367#discussion_r74711517
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
---
    @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) {
     										edge.getTarget(),
     										edge.getVersion(),
     										copy));
    +								if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
    +									cleanUp.add(edge.getTarget());
    +								}
     							}
     						}
     					}
     				}
     			}
     		}
     
    +		// Remove shared buffer entries to maintain correct matching behaviour
    +		doCleanUp(new Predicate<K, V>() {
    +
    +			@Override
    +			public boolean toRemove(SharedBufferEntry<K, V> entry) {
    +				return cleanUp.contains(entry);
    +			}
    +		});
    +		// Remove all entries that are dependent on the current event
    +		if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
    +			doCleanUp(new Predicate<K, V>() {
    +
    +				@Override
    +				public boolean toRemove(SharedBufferEntry<K, V> entry) {
    +					if (entry == null) {
    +						return false;
    +					}
    +					return entry.getValueTime().value == value
    +						&& entry.getValueTime().timestamp == timestamp;
    +				}
    +			});
    +		}
    +
     		return result;
     	}
     
    +	private void doCleanUp(Predicate<K, V> predicate) {
    +		ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();
    +		for (SharedBufferPage<K, V> page : this.pages.values()) {
    +			for (SharedBufferEntry<K, V> entry : page.getEntries()) {
    +				if (entry.getReferenceCounter() <= 1) {
    +					doRecursiveCleanup(entry, predicate, toRemove);
    +				}
    +			}
    +		}
    +
    +		for (SharedBufferEntry<K, V> startNode: toRemove) {
    +			release(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
    +			remove(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
    +		}
    +	}
    +
    +	private boolean doRecursiveCleanup(SharedBufferEntry<K, V> startNode, Predicate<K,
V> cleanUp, ArrayList<SharedBufferEntry<K, V>> toRemove) {
    --- End diff --
    
    Replace ArrayList by List in the arguments, unless we need it to be ArrayList explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message