Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 51775200C87 for ; Wed, 17 May 2017 14:42:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4FF6A160BB5; Wed, 17 May 2017 12:42:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6AA45160BD3 for ; Wed, 17 May 2017 14:42:42 +0200 (CEST) Received: (qmail 42784 invoked by uid 500); 17 May 2017 12:42:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 42570 invoked by uid 99); 17 May 2017 12:42:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 May 2017 12:42:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 369FCDFF26; Wed, 17 May 2017 12:42:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Date: Wed, 17 May 2017 12:42:47 -0000 Message-Id: <06ec994df3f9426eaacae17673dae637@git.apache.org> In-Reply-To: <4cb98902bd4a480d900b5352f722946c@git.apache.org> References: <4cb98902bd4a480d900b5352f722946c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library. archived-at: Wed, 17 May 2017 12:42:43 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 88a5703..824df2d 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -32,9 +32,9 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.junit.Ignore; import org.junit.Test; import java.net.URL; @@ -57,7 +57,6 @@ public class CEPMigration11to13Test { } @Test - @Ignore public void testKeyedCEPOperatorMigratation() throws Exception { KeySelector keySelector = new KeySelector() { @@ -136,11 +135,58 @@ public class CEPMigration11to13Test { assertEquals(middleEvent, patternMap.get("middle").get(0)); assertEquals(endEvent, patternMap.get("end").get(0)); + // and now go for a checkpoint with the new serializers + + final Event startEvent1 = new Event(42, "start", 2.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord(startEvent1, 21)); + harness.processElement(new StreamRecord(middleEvent1, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord resultRecord1 = (StreamRecord) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map> patternMap1 = (Map>) resultRecord1.getValue(); + + assertEquals(startEvent1, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent1, patternMap1.get("end").get(0)); + harness.close(); } @Test - @Ignore public void testNonKeyedCEPFunctionMigration() throws Exception { final Event startEvent = new Event(42, "start", 1.0); @@ -191,7 +237,7 @@ public class CEPMigration11to13Test { harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); harness.processElement(new StreamRecord<>(endEvent, 5)); - harness.processWatermark(new Watermark(Long.MAX_VALUE)); + harness.processWatermark(new Watermark(20)); ConcurrentLinkedQueue result = harness.getOutput(); @@ -210,6 +256,54 @@ public class CEPMigration11to13Test { assertEquals(middleEvent, patternMap.get("middle").get(0)); assertEquals(endEvent, patternMap.get("end").get(0)); + // and now go for a checkpoint with the new serializers + + final Event startEvent1 = new Event(42, "start", 2.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord(startEvent1, 21)); + harness.processElement(new StreamRecord(middleEvent1, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + ByteSerializer.INSTANCE, + new NFAFactory(), + false), + keySelector, + BasicTypeInfo.BYTE_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord resultRecord1 = (StreamRecord) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map> patternMap1 = (Map>) resultRecord1.getValue(); + + assertEquals(startEvent1, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent1, patternMap1.get("end").get(0)); + harness.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index eb50dfd..41593b0 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -30,6 +30,7 @@ import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -379,7 +380,6 @@ public class CEPOperatorTest extends TestLogger { Event middle1Event3 = new Event(41, "a", 4.0); Event middle2Event1 = new Event(41, "b", 5.0); - TestKeySelector keySelector = new TestKeySelector(); KeyedCEPPatternOperator operator = new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), false, @@ -530,7 +530,113 @@ public class CEPOperatorTest extends TestLogger { harness.close(); } - + + @Test + public void testCEPOperatorSerializationWRocksDB() throws Exception { + String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); + RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + + final Event startEvent1 = new Event(40, "start", 1.0); + final Event startEvent2 = new Event(40, "start", 2.0); + final SubEvent middleEvent1 = new SubEvent(40, "foo1", 1.0, 10); + final SubEvent middleEvent2 = new SubEvent(40, "foo2", 2.0, 10); + final SubEvent middleEvent3 = new SubEvent(40, "foo3", 3.0, 10); + final SubEvent middleEvent4 = new SubEvent(40, "foo4", 1.0, 10); + final Event nextOne = new Event(40, "next-one", 1.0); + final Event endEvent = new Event(40, "end", 1.0); + + final Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition() { + + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } + }).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + KeyedCEPPatternOperator operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFACompiler.NFAFactory() { + + private static final long serialVersionUID = 477082663248051994L; + + @Override + public NFA createNFA() { + return NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + } + }, + true); + + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(operator); + harness.setStateBackend(rocksDBStateBackend); + harness.open(); + + harness.processWatermark(0L); + harness.processElement(new StreamRecord<>(startEvent1, 1)); + harness.processElement(new StreamRecord(middleEvent1, 2)); + harness.processWatermark(2L); + harness.processElement(new StreamRecord(middleEvent2, 3)); + harness.processElement(new StreamRecord<>(startEvent2, 4)); + harness.processElement(new StreamRecord(middleEvent3, 5)); + harness.processWatermark(5L); + harness.processElement(new StreamRecord(middleEvent4, 5)); + harness.processElement(new StreamRecord<>(nextOne, 6)); + harness.processElement(new StreamRecord<>(endEvent, 8)); + harness.processWatermark(100L); + + List> resultingPatterns = new ArrayList<>(); + while (!harness.getOutput().isEmpty()) { + Object o = harness.getOutput().poll(); + if (!(o instanceof Watermark)) { + StreamRecord>> el = (StreamRecord>>) o; + List res = new ArrayList<>(); + for (List le: el.getValue().values()) { + res.addAll(le); + } + resultingPatterns.add(res); + } + } + + compareMaps(resultingPatterns, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1), + Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent1), + Lists.newArrayList(startEvent2, endEvent, middleEvent3) + ) + ); + } + private void verifyWatermark(Object outputObject, long timestamp) { assertTrue(outputObject instanceof Watermark); assertEquals(timestamp, ((Watermark) outputObject).getTimestamp());