flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kostas Kloudas (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-6578) SharedBuffer creates self-loops when having elements with same value/timestamp.
Date Sun, 14 May 2017 15:35:04 GMT
Kostas Kloudas created FLINK-6578:
-------------------------------------

             Summary: SharedBuffer creates self-loops when having elements with same value/timestamp.
                 Key: FLINK-6578
                 URL: https://issues.apache.org/jira/browse/FLINK-6578
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.3.0
            Reporter: Kostas Kloudas
            Assignee: Kostas Kloudas
             Fix For: 1.3.0


This is a test that fails with the current implementation due to the fact that the looping
state accepts the two {{middleEvent1}} elements but the shared buffer cannot distinguish between
them and gets trapped in an infinite loop leading to running out of memory.

{code}
@Test
	public void testEagerZeroOrMoreSameElement() {
		List<StreamRecord<Event>> inputEvents = new ArrayList<>();

		Event startEvent = new Event(40, "c", 1.0);
		Event middleEvent1 = new Event(41, "a", 2.0);
		Event middleEvent2 = new Event(42, "a", 3.0);
		Event middleEvent3 = new Event(43, "a", 4.0);
		Event end1 = new Event(44, "b", 5.0);

		inputEvents.add(new StreamRecord<>(startEvent, 1));
		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
		inputEvents.add(new StreamRecord<>(end1, 7));

		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>()
{
			private static final long serialVersionUID = 5726188262756267490L;

			@Override
			public boolean filter(Event value) throws Exception {
				return value.getName().equals("c");
			}
		}).followedBy("middle").where(new SimpleCondition<Event>() {
			private static final long serialVersionUID = 5726188262756267490L;

			@Override
			public boolean filter(Event value) throws Exception {
				return value.getName().equals("a");
			}
		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
			private static final long serialVersionUID = 5726188262756267490L;

			@Override
			public boolean filter(Event value) throws Exception {
				return value.getName().equals("b");
			}
		});

		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);

		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);

		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2,
middleEvent3, middleEvent3, end1),
				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2,
middleEvent3, end1),
				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2,
end1),
				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
				Lists.newArrayList(startEvent, middleEvent1, end1),
				Lists.newArrayList(startEvent, end1)
		));
	}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message