flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7147) Support greedy quantifier in CEP
Date Mon, 07 Aug 2017 11:18:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116444#comment-16116444
] 

ASF GitHub Bot commented on FLINK-7147:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131629581
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(a3, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList());
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsBeforeQuantifier() {
    --- End diff --
    
    misspell: `testGreedyUtilOneOrMoreWithDummyEventsBeforeQuantifier` -> `testGreedyUntilOneOrMoreWithDummyEventsBeforeQuantifier`


> Support greedy quantifier in CEP
> --------------------------------
>
>                 Key: FLINK-7147
>                 URL: https://issues.apache.org/jira/browse/FLINK-7147
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API & SQL
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>
> Greedy quantifier will try to match the token as many times as possible. For example,
for pattern {{a b* c}} (skip till next is used) and inputs {{a b1 b2 c}}, if the quantifier
for {{b}} is greedy, it will only output {{a b1 b2 c}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message