Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 175A4200BCC for ; Tue, 29 Nov 2016 15:19:36 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1616C160B15; Tue, 29 Nov 2016 14:19:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 13F02160B05 for ; Tue, 29 Nov 2016 15:19:34 +0100 (CET) Received: (qmail 48008 invoked by uid 500); 29 Nov 2016 14:19:34 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 47998 invoked by uid 99); 29 Nov 2016 14:19:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2016 14:19:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id BA261C136E for ; Tue, 29 Nov 2016 14:19:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.57 X-Spam-Level: X-Spam-Status: No, score=-1.57 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=hotmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id A6oCIVIPvCFR for ; Tue, 29 Nov 2016 14:19:31 +0000 (UTC) Received: from BLU004-OMC3S25.hotmail.com (blu004-omc3s25.hotmail.com [65.55.116.100]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id E742D5F5FB for ; Tue, 29 Nov 2016 14:19:30 +0000 (UTC) Received: from EUR03-VE1-obe.outbound.protection.outlook.com ([65.55.116.74]) by BLU004-OMC3S25.hotmail.com over TLS secured channel with Microsoft SMTPSVC(7.5.7601.23008); Tue, 29 Nov 2016 06:18:42 -0800 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=hotmail.com; s=selector1; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version; bh=95+P5OrZlbERsdCEGWk2N6Y/iqZR6MUlG6gd816uePg=; b=RVtaYh/0gqBlsPyzFwV5iJR5tS3Notz+mb2W1Vi0MsN6Z2G82jUn7n5GMTirxN4XTcV7I+la3Attqy/y/dAB/p7E8nmVzpQwcCEnS4E3MRqlT3xziCSu6+GguJpbpz+QR6/zlGTwaaLqfQAqC0R1qI4GGc1iZ7XAdYQgaEoFv2bLz1ErkugkRWlKZxbt1B4ERqKS82nLZNK1r5RmP+q28DmjuLH6EiXCXmk0UxsY3Yc2v+9/ymbWxFrom+filUsPf1g2UqvKmyLT8xPiwuo0Mec/NCyYM+L4R9BEPn20fQ2i5ILd2TlKaKPKU3GIqA9h2HCYF8keOIzLKkkOlgV1Fg== Received: from DB5EUR03FT060.eop-EUR03.prod.protection.outlook.com (10.152.20.59) by DB5EUR03HT184.eop-EUR03.prod.protection.outlook.com (10.152.21.56) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.734.4; Tue, 29 Nov 2016 14:18:40 +0000 Received: from DB5PR06MB1829.eurprd06.prod.outlook.com (10.152.20.52) by DB5EUR03FT060.mail.protection.outlook.com (10.152.21.231) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.734.4 via Frontend Transport; Tue, 29 Nov 2016 14:18:40 +0000 Received: from DB5PR06MB1829.eurprd06.prod.outlook.com ([10.165.213.151]) by DB5PR06MB1829.eurprd06.prod.outlook.com ([10.165.213.151]) with mapi id 15.01.0747.015; Tue, 29 Nov 2016 14:18:40 +0000 From: "kieran ." To: "user@flink.apache.org" Subject: CEP issue Thread-Topic: CEP issue Thread-Index: AQHSSksLMOtVC5C3FUWwTKpzC2jx5Q== Date: Tue, 29 Nov 2016 14:18:40 +0000 Message-ID: Accept-Language: en-GB, en-US Content-Language: en-GB X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: flink.apache.org; dkim=none (message not signed) header.d=none;flink.apache.org; dmarc=none action=none header.from=hotmail.com; x-incomingtopheadermarker: OriginalChecksum:;UpperCasedChecksum:;SizeAsReceived:7147;Count:36 x-tmn: [PbH4SFG5vyzc6fs/cD+PF3RIoRUKPHE4] x-incomingheadercount: 36 x-eopattributedmessage: 0 x-microsoft-exchange-diagnostics: 1;DB5EUR03HT184;7:/bnIvFfXihGziT5upkQu8ZXu5Zlrj8AnWudlC/NtlWArJS5exUbKbgihHkboEvCHSFlJ5b5Of7a/XD7NkjDAC7u0VJeC7JyQGLTLz3uM0u/5ft+3YfwIPfKXT7G9GtFTjxXxMB5IV1SSbP0exL+pjqRojYdzPBX2YPRMWn6VmvzJbiaes4MXrvTlun03PxtVh2K1/CXS5WsemgXb/b2D0gTYEHuOW5W4X7PU9qjNDhPfYDWxqbcqvDeqbHMjbUmkYgiBlXeKL1YIM+APqyYwNSKuxex5UGXwHRBT9edrUXYeAqE3B376DuLLWeLrbckYkEyCMhNUzQjvC2SHb75R5YGG0+7cN6xDmFwIMBLaMjo= x-forefront-antispam-report: EFV:NLI;SFV:NSPM;SFS:(10019020)(98900003);DIR:OUT;SFP:1102;SCL:1;SRVR:DB5EUR03HT184;H:DB5PR06MB1829.eurprd06.prod.outlook.com;FPR:;SPF:None;LANG:en; x-ms-office365-filtering-correlation-id: 9bf61b6e-5cf8-4ffd-1167-08d418629e2e x-microsoft-antispam: UriScan:;BCL:0;PCL:0;RULEID:(22001)(1601124038)(1603103113)(1601125047);SRVR:DB5EUR03HT184; x-exchange-antispam-report-cfa-test: BCL:0;PCL:0;RULEID:(432015012)(82015046);SRVR:DB5EUR03HT184;BCL:0;PCL:0;RULEID:;SRVR:DB5EUR03HT184; x-forefront-prvs: 01415BB535 spamdiagnosticoutput: 1:99 spamdiagnosticmetadata: NSPM Content-Type: multipart/alternative; boundary="_000_DB5PR06MB1829C6FFACFA3AF040AE048AD98D0DB5PR06MB1829eurp_" MIME-Version: 1.0 X-OriginatorOrg: hotmail.com X-MS-Exchange-CrossTenant-originalarrivaltime: 29 Nov 2016 14:18:40.4682 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Internet X-MS-Exchange-CrossTenant-id: 84df9e7f-e9f6-40af-b435-aaaaaaaaaaaa X-MS-Exchange-Transport-CrossTenantHeadersStamped: DB5EUR03HT184 X-OriginalArrivalTime: 29 Nov 2016 14:18:42.0248 (UTC) FILETIME=[7CB62480:01D24A4B] archived-at: Tue, 29 Nov 2016 14:19:36 -0000 --_000_DB5PR06MB1829C6FFACFA3AF040AE048AD98D0DB5PR06MB1829eurp_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Hello, I am currently building a multi-tenant monitoring application and exploring= the effectiveness of different Complex Event Processors (CEP) and whether = or not this would be a potential solution for what I want to achieve. I hav= e created a small test application which utilises Flink and its CEP but I h= ave come across some issues when dealing with a large number of metrics to = monitor when using patterns/pattern streams. Flink seems to operate as expe= cted with one, or several patterns each consuming it's own PatternStream, b= ut as soon as more are introduced the memory usage of Flink seems to rise r= ather quickly and eventually throw an OutOfMemoryError. My initial idea was= to create one pattern/pattern stream for each metric that I need to monito= r, but there could be many thousands of these. I create the PatternStream per Pattern like this to monitor a metric: Pattern pattern =3D Pattern. begin( patternNam= e ).subtype( MetricData.class ) .where( (evt -> evt.getValues().get( "max" ).longValue() > 50.0 && evt.account_id.equals( accountName )) ); check.withPattern( pattern ) .withTimePeriod( Integer.valueOf( 1 ) ) .withCooldown( Integer.valueOf( 1 ) ) .withName( checkName ) .withAlertStatus( AlertStatus.OK ) .setPatternStream(CEP.pattern(messageStream.keyBy("account_= id"), pattern)); To trigger these patterns, I use PatternSelectFunction psf =3D new Patter= nSelectFunction() { @Override public MetricWarning select( Map map ) thro= ws Exception { return new MetricWarning(map.get(patternKey), name, account= Id); } }; try { check.getPatternStream().select(psf); } catch( Exception exception ) { exception.printStackTrace(); } The pattern in the above example is tied to a specific stream which would r= esult in one stream per pattern and this seems to be an issue using this ap= proach. If it would be possible to run one pattern stream and switching out= the patterns when needed, then perhaps this would be a viable solution. Am= I approaching this in the right way by creating a stream for each pattern? Would it be possible to create a set of Pattern processors that could be ru= n against a single PatternStream or is there anything you could suggest whi= ch would allow me to do this with Flink? Thanks, - Kieran --_000_DB5PR06MB1829C6FFACFA3AF040AE048AD98D0DB5PR06MB1829eurp_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable

Hello,

I am currently building a multi-tenant monitoring application and expl= oring the effectiveness of different Complex Event Processors (CEP) and whe= ther or not this would be a potential solution for what I want to achieve.&= nbsp;I have created a small test application which utilises Flink and its CEP but= I have come across some issues when dealing with a large number of metrics= to monitor when using patterns/pattern streams. Flink seems to operate as expected = with one, or several patterns each consuming it's own PatternStream, but as= soon as more are introduced the memory usage of Flink seems to rise rather quickly and eventually throw an OutOfMemoryError. My initial idea was to c= reate one pattern/pattern stream for each metric that I need to monitor, bu= t there could be many thousands of these.

I create the PatternStream per Pattern like this to monitor a metric:<= /div>

  Pattern<MetricData, ?> pattern =3D Pattern.<MetricData> beg= in( patternName ).subtype( MetricData.class )

                .where(=

                (evt -> = evt.getValues().get( "max" ).longValue() > 50.0

                  &nbs= p;     && evt.account_id.equals( accountName )) );


        check.withPattern( pattern )

                .withTimePe= riod( Integer.valueOf( 1 ) )

                .withCooldo= wn( Integer.valueOf( 1 ) )

                .withName( = checkName )

                .withAlertS= tatus( AlertStatus.OK )

                .setPattern= Stream(CEP.pattern(messageStream.keyBy("account_id"), pattern));<= /i>


To trigger these patterns, I use

        PatternSelectFunction<MetricData, Me= tricWarning> psf =3D new PatternSelectFunction<MetricData, MetricWarn= ing>()

        {

            @Override

            public MetricWarning sele= ct( Map<String, MetricData> map ) throws Exception

            {

                return new = MetricWarning(map.get(patternKey), name, accountId);

            }


        };


        try

        {

            check.getPatternStream().= select(psf);

        }

        catch( Exception exception )

        {

            exception.printStackTrace= ();

        }



The pattern in the above example is tied to a specific stream which wo= uld result in one stream per pattern and this seems to be an issue using th= is approach. If it would be possible to run one pattern stream and switchin= g out the patterns when needed, then perhaps this would be a viable solution. Am I approaching this in the= right way by creating a stream for each pattern? 

Would it be possible to create a set of Pattern processors that could = be run against a single PatternStream or is there anything you could sugges= t which would allow me to do this with Flink?

Thanks,
- Kieran

--_000_DB5PR06MB1829C6FFACFA3AF040AE048AD98D0DB5PR06MB1829eurp_--