From user-return-18307-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Feb 20 16:39:00 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 72377180654 for ; Tue, 20 Feb 2018 16:38:59 +0100 (CET) Received: (qmail 7654 invoked by uid 500); 20 Feb 2018 15:38:53 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 7644 invoked by uid 99); 20 Feb 2018 15:38:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2018 15:38:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E22B8C0040 for ; Tue, 20 Feb 2018 15:38:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.292 X-Spam-Level: *** X-Spam-Status: No, score=3.292 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=stellapps-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Ux5fhnym4hQS for ; Tue, 20 Feb 2018 15:38:49 +0000 (UTC) Received: from mail-ot0-f169.google.com (mail-ot0-f169.google.com [74.125.82.169]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id C5B8C5F5DB for ; Tue, 20 Feb 2018 15:38:48 +0000 (UTC) Received: by mail-ot0-f169.google.com with SMTP id 79so11748712oth.11 for ; Tue, 20 Feb 2018 07:38:48 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=stellapps-com.20150623.gappssmtp.com; s=20150623; h=mime-version:from:date:message-id:subject:to:cc; bh=XeaM8ajOTBRJYmzhGw4Ddr1Pp57ErEbpjkUrrlLVrPE=; b=HGRkBPyCKeZB410zuD5HkX+dz2XilVN2gKa/Lk0hDMak/q6xOAN2D4CJnscFSByd0C 42JOBrJeG562XFnYXWhATLSgkmZYEID/v8WndnCnhaaboM4zRm25qtKCmeLVrnhr6Kkm l4YT3omNlXxdWsaGgJWxH7bWfrplubGtJf0aCBztVcoAwp08OcKXPOemPhIg4dIP4A6n 1j3MB/HWwCE7iTqeRz7qP2b/+ktZ7Bai8TCWnyZfA9+rHUSD0kgrIKYMe97DGkDB8G1c MjbSzwpxHmXDkAQzpt8CcyRI1ZSfYs37utKOsBZ0ss9l9vr5RtBKrnBGarO+9IJ/eJPX ED5Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to:cc; bh=XeaM8ajOTBRJYmzhGw4Ddr1Pp57ErEbpjkUrrlLVrPE=; b=h3vvwivm0ML2NAv0lIxdAxBRmhGTzEQGqHc2YHhiV8XzxAf+AAqEFsCwrNy3rh79s8 jFJjGq/JYVVHl9XjYXP9rerFGv899mneFn2wUCv2oqdzYV+/HIioDyneDabtdDfhppfp cCxhI0VOgGgZlAwBnXAldnrsM9Qq37LjGZYKId4saExJ/eywrKqxhhLyoMutE1vebegC h5svI3ATe0CbwVuMn2AlNpt0qLM1iPBeVgsOos1rLtFv/SSLGAKeh9UMqu6BWxyIqyMa cMfQbinRj93nV7IAi2YymmtKvXYO1J3C3Vngl5o0DlJC6PLmtzQ/Euzz3EKy7gGYykCB k3Rw== X-Gm-Message-State: APf1xPA0kJWR0Tu3MoG5z1KBPyU6BDdQGfyVcupoL4/D9WbvH2/4uiX7 pn9ma6egrPjxOzQH4WlmWCfVnicwTctZuf+G+bC8xA== X-Google-Smtp-Source: AG47ELvUgLtdDrl24wwRsGpB9RK9Y7voCew7TA0JIj3PmavnYJyMQd4r4iA0bsmbYi+paU7Bk1sYwS8WcDy6oZciexA= X-Received: by 10.157.34.194 with SMTP id y60mr37283ota.346.1519141128055; Tue, 20 Feb 2018 07:38:48 -0800 (PST) MIME-Version: 1.0 Received: by 10.74.196.134 with HTTP; Tue, 20 Feb 2018 07:38:47 -0800 (PST) From: Shailesh Jain Date: Tue, 20 Feb 2018 21:08:47 +0530 Message-ID: Subject: Re: Correlation between number of operators and Job manager memory requirements To: Till Rohrmann Cc: Pawel Bartoszek , User Content-Type: multipart/alternative; boundary="94eb2c0505fa4d5cbe0565a69b64" --94eb2c0505fa4d5cbe0565a69b64 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Till, Thanks for your reply. >> My suggestion would be to split the different patterns up and run them with in different jobs. I'm not able to understand how splitting up the jobs based on patterns would be more efficient than based on the key. The total number of operators would still be the same, right? >> But splitting the input stream will generate many concurrent operators which all run the same CEP operator. Are you suggesting using the split transformation here? I also see a similar thread [1] where you had suggested using split. When I generate a watermark on SplitStream, will it be assigned only on that 'partition' of the split stream? If so, will applying the CEP operator on the SplitStream behave in the same way (i.e. like a KeyedCEPOperator) and NOT create separate NFA instances for each partition (selection)? >> CEP operators should be chainable if I'm not mistaken I am not able to find any documentation on how can I explicitly chain 2 CEP operators which are applied to the same data stream (not one after another). It would be really helpful if you can point me to it. Thanks, Shailesh [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Differe= nt-watermarks-on-keyed-stream-td14751.html On Tue, Feb 20, 2018 at 4:46 PM, Till Rohrmann wrote= : > Hi Shailesh, > > I fear that given your job topology, it is not that surprising that thing= s > break. The problem is that you might have M x N CEP operators concurrentl= y > active. This means that they have to keep their state in memory. Given 3.= 5 > GB isn't that much if you have more than 300 CEP NFAs running. This is > roughly 10 MB per NFA. Depending on your the time window, the size of > records and the stream throughput, this should be easily reachable. > > My suggestion would be to split the different patterns up and run them > with in different jobs. Then you should also give more resources to the T= M. > And ideally you don't do the filter operation on the stream, because this > increases the number of CEP operators quite a bit and thus also the memor= y > footprint. > > Concerning your questions: > 1. CEP operators should be chainable, if I'm not mistaken > 2. Per-key watermarks are indeed not supported in Flink. But splitting th= e > input stream will generate many concurrent operators which all run the sa= me > CEP operator. Best would be to generate watermarks which work for all key= s. > 3. I think your assumption should be correct. I think monitoring the JM > process via VisualVM should be quite good to see the memory requirements. > > Cheers, > Till > > On Tue, Feb 20, 2018 at 11:23 AM, Shailesh Jain < > shailesh.jain@stellapps.com> wrote: > >> Hi Till, >> >> When I'm submitting one big job, both JM and TM (sometimes just JM) are >> crashing at the time of initialization itself (i.e. not all operators >> switch to RUNNING) with OOM. The number of threads on TM go to almost 10= 00. >> >> But when I'm submitting multiple jobs, job submission is completed. But >> when data starts coming in (its a live stream), the task managers memory >> usage grows and eventually it crashes. >> >> The patterns I'm trying to match are simple (A followed by B, A followed >> by B within X mins etc.), but the number of patterns is large (due to th= e >> reason mentioned in my question 2 below). >> >> Configuration: 1 JM and 1 TM >> >> jobmanager.heap.mb: 512 >> taskmanager.heap.mb: 3596 >> taskmanager.numberOfTaskSlots: 5 >> parallelism.default: 1 >> jobmanager.rpc.port: 6123 >> state.backend: filesystem >> taskmanager.debug.memory.startLogThread: true >> taskmanager.debug.memory.logIntervalMs: 120000 >> akka.ask.timeout: 2 min >> akka.client.timeout: 5 min >> akka.framesize: 404857600b >> restart-strategy: fixed-delay >> restart-strategy.fixed-delay.attempts: 3 >> restart-strategy.fixed-delay.delay: 10 s >> >> I'm submitting 5 jobs, and each job has ~80 operators. >> >> With the above configuration, the job submission is successful, but the >> TM's eventually max out their heap usage. >> >> But, as mentioned earlier, when I change the number of slots to 1 and >> submit 1 job with 300+ operators, the job submission fails with OOM. >> >> 3 questions here: >> >> 1. Is it possible to chain multiple CEP operators into a single task? So >> that the number of threads is reduced. The reason here is that when I'm >> submitting one big job, the OOM always occurs when JVM is trying to crea= te >> a new thread. >> >> 2. Instead of using a KeyedStream, I'm creating multiple streams per key >> (using a filter operator) and then applying all N patterns to that strea= m. >> So essentially it is resulting in M (number of patterns) x N (number of >> keys) CEP operators/tasks. The reason behind creating this is that I nee= d >> to have different watermarks per key (a key represents a physical source= , >> and the source time could be different, resulting in events getting >> dropped), and I believe generating watermarks per key is not supported y= et. >> Is this understanding correct? Do you have any ideas/recommendations to >> address this use case? >> >> 3. How can we benchmark the resources required by JM? Is it OK to assume >> that the amount of memory required by JM grows linearly with the total >> number of operators deployed? >> >> Thanks, >> Shailesh >> >> >> On Mon, Feb 19, 2018 at 10:18 PM, Till Rohrmann >> wrote: >> >>> Hi Shailesh, >>> >>> my question would be where do you see the OOM happening? Does it happen >>> on the JM or the TM. >>> >>> The memory requirements for each operator strongly depend on the >>> operator and it is hard to give a general formula for that. It mostly >>> depends on the user function. Flink itself should not need too much ext= ra >>> memory for the framework specific code. >>> >>> CEP, however, can easily add a couple of hundred megabytes to your >>> memory requirements. This depends strongly on the pattern you're matchi= ng >>> and which state backend you're using. >>> >>> Concerning your question one big job vs. multiple jobs, I could see tha= t >>> this helps if not all jobs are executed at the same time. Especially if= you >>> only have a single TM with a limited number of slots, I think that you >>> effectively queue up jobs. That should reduce the required amount of >>> resources for each individual job. >>> >>> Cheers, >>> Till >>> >>> On Mon, Feb 19, 2018 at 11:35 AM, Shailesh Jain < >>> shailesh.jain@stellapps.com> wrote: >>> >>>> Actually, there are too many hyperparameters to experiment with, that >>>> is why I'm trying to understand if there is any particular way in whic= h a >>>> cluster could be benchmarked. >>>> >>>> Another strange behaviour I am observing is: Delaying the operator >>>> creation (by distributing the operators across jobs, and submitting >>>> multiple jobs to the same cluster instead of one) is helping in creati= ng >>>> more operators. Any ideas on why that is happening? >>>> >>>> Shailesh >>>> >>>> >>>> On Sun, Feb 18, 2018 at 11:16 PM, Pawel Bartoszek < >>>> pawelbartoszek89@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> You could definitely try to find formula for heap size, but isnt's it >>>>> easier just to try out different memory settings and see which works = best >>>>> for you? >>>>> >>>>> Thanks, >>>>> Pawel >>>>> >>>>> 17 lut 2018 12:26 "Shailesh Jain" >>>>> napisa=C5=82(a): >>>>> >>>>> Oops, hit send by mistake. >>>>> >>>>> In the configuration section, it is mentioned that for "many >>>>> operators" heap size should be increased. >>>>> >>>>> "JVM heap size (in megabytes) for the JobManager. You may have to >>>>> increase the heap size for the JobManager if you are running very lar= ge >>>>> applications (with many operators), or if you are keeping a long hist= ory of >>>>> them." >>>>> >>>>> Is there any recommendation on the heap space required when there are >>>>> around 200 CEP operators, and close 80 Filter operators? >>>>> >>>>> Any other leads on calculating the expected heap space allocation to >>>>> start the job would be really helpful. >>>>> >>>>> Thanks, >>>>> Shailesh >>>>> >>>>> >>>>> >>>>> On Sat, Feb 17, 2018 at 5:53 PM, Shailesh Jain < >>>>> shailesh.jain@stellapps.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have flink job with almost 300 operators, and every time I'm tryin= g >>>>>> to submit the job, the cluster crashes with OutOfMemory exception. >>>>>> >>>>>> I have 1 job manager and 1 task manager with 2 GB heap space >>>>>> allocated to both. >>>>>> >>>>>> In the configuration section of the documentation >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> > --94eb2c0505fa4d5cbe0565a69b64 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Till,

Thanks for your reply.

>>= ; My suggestion would be to split the different patterns up and run them wi= th in different jobs.

I'm not able to understand how= splitting up the jobs based on patterns would be more efficient than based= on the key. The total number of operators would still be the same, right?<= br>
>> But splitting the input stream will generate man= y concurrent operators which all run the same CEP operator.

Are you suggesting using the split transformation here? I also see a sim= ilar thread [1] where you had suggested using split. When I generate a wate= rmark on SplitStream, will it be assigned only on that 'partition' = of the split stream? If so, will applying the CEP operator on the SplitStre= am behave in the same way (i.e. like a KeyedCEPOperator) and NOT create sep= arate NFA instances for each partition (selection)?

>> CEP ope= rators should be chainable if I'm not mistaken

I am n= ot able to find any documentation on how can I explicitly chain 2 CEP opera= tors which are applied to the same data stream (not one after another). It = would be really helpful if you can point me to it.

Thanks= ,
Shailesh

=C2=A0
[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Di= fferent-watermarks-on-keyed-stream-td14751.html
On Tue, Feb 20, 2018 at 4:46 PM, Till Rohrmann <= ;trohrmann@apache= .org> wrote:
Hi Shailesh,

I fear that given you= r job topology, it is not that surprising that things break. The problem is= that you might have M x N CEP operators concurrently active. This means th= at they have to keep their state in memory. Given 3.5 GB isn't that muc= h if you have more than 300 CEP NFAs running. This is roughly 10 MB per NFA= . Depending on your the time window, the size of records and the stream thr= oughput, this should be easily reachable.

My sugge= stion would be to split the different patterns up and run them with in diff= erent jobs. Then you should also give more resources to the TM. And ideally= you don't do the filter operation on the stream, because this increase= s the number of CEP operators quite a bit and thus also the memory footprin= t.

Concerning your questions:
1. CEP ope= rators should be chainable, if I'm not mistaken
2. Per-key wa= termarks are indeed not supported in Flink. But splitting the input stream = will generate many concurrent operators which all run the same CEP operator= . Best would be to generate watermarks which work for all keys.
3= . I think your assumption should be correct. I think monitoring the JM proc= ess via VisualVM should be quite good to see the memory requirements.
=

Cheers,
Till

On Tue, Feb 20, 2018 at 11:23 AM, Shailesh Jain <s= hailesh.jain@stellapps.com> wrote:
Hi Till,

When I'= ;m submitting one big job, both JM and TM (sometimes just JM) are crashing = at the time of initialization itself (i.e. not all operators switch to RUNN= ING) with OOM. The number of threads on TM go to almost 1000.
=

But wh= en I'm submitting multiple jobs, job submission is completed. But when = data starts coming in (its a live stream), the task managers memory usage g= rows and eventually it crashes.

The= patterns I'm trying to match are simple (A followed by B, A followed b= y B within X mins etc.), but the number of patterns is large (due to the re= ason mentioned in my question 2 below).

Configuration: 1 JM and 1 TM

jobmanager.heap.mb: 512
taskman= ager.heap.mb: 3596
taskmanager.numberOfTaskSlots: 5
parallelism.defau= lt: 1
jobmanager.rpc.port: 6123
state.backend: filesystem
taskmana= ger.debug.memory.startLogThread: true
taskmanager.debug.memory.logI= ntervalMs: 120000
akka.ask.timeout: 2 min
akka.client.timeout: 5= min
akka.framesize: 404857600b
restart-strategy: fixed-delay
rest= art-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.d= elay: 10 s

I'm submitting = 5 jobs, and each job has ~80 operators.

With the above configuration, the job submission is successful, but th= e TM's eventually max out their heap usage.

But, as mentioned earlier, when I change the number of slots t= o 1 and submit 1 job with 300+ operators, the job submission fails with OOM= .

3 questions here:

1. Is it= possible to chain multiple CEP operators into a single task? So that the n= umber of threads is reduced. The reason here is that when I'm submittin= g one big job, the OOM always occurs when JVM is trying to create a new thr= ead.

2. Instead of using a KeyedSt= ream, I'm creating multiple streams per key (using a filter operator) a= nd then applying all N patterns to that stream. So essentially it is result= ing in M (number of patterns) x N (number of keys) CEP operators/tasks. The= reason behind creating this is that I need to have different watermarks pe= r key (a key represents a physical source, and the source time could be dif= ferent, resulting in events getting dropped), and I believe generating wate= rmarks per key is not supported yet. Is this understanding correct? Do you = have any ideas/recommendations to address this use case?

3. How can we benchmark the resources required by JM?= Is it OK to assume that the amount of memory required by JM grows linearly= with the total number of operators deployed?

Thanks,
Shailesh


On Mon= , Feb 19, 2018 at 10:18 PM, Till Rohrmann <trohrmann@apache.org>= wrote:
Hi Shailesh,

my question would be where do you= see the OOM happening? Does it happen on the JM or the TM.

<= /div>
The memory requirements for each operator strongly depend on the = operator and it is hard to give a general formula for that. It mostly depen= ds on the user function. Flink itself should not need too much extra memory= for the framework specific code.=C2=A0

CEP, howev= er, can easily add a couple of hundred megabytes to your memory requirement= s. This depends strongly on the pattern you're matching and which state= backend you're using.

Concerning your questio= n one big job vs. multiple jobs, I could see that this helps if not all job= s are executed at the same time. Especially if you only have a single TM wi= th a limited number of slots, I think that you effectively queue up jobs. T= hat should reduce the required amount of resources for each individual job.=

Cheers,
Till

On Mon, Feb 19, 2018 at 11:35 AM, Shailesh Jain <shail= esh.jain@stellapps.com> wrote:
Actually, there are too many h= yperparameters to experiment with, that is why I'm trying to understand= if there is any particular way in which a cluster could be benchmarked.
Another strange behaviour I am observing is: Delaying the o= perator creation (by distributing the operators across jobs, and submitting= multiple jobs to the same cluster instead of one) is helping in creating m= ore operators. Any ideas on why that is happening?

Shailesh


On Sun, F= eb 18, 2018 at 11:16 PM, Pawel Bartoszek <pawelbartoszek89@gmail.= com> wrote:
Hi,

You = could definitely try to find formula for heap size, but isnt's it easie= r just to try out different memory settings and see which works best for yo= u?

Thanks,
Pawel

17 lut 2018 12= :26 "Shailesh Jain" <shailesh.jain@stellapps.com> napisa=C5=82(a)= :
Oops, hit send by mistake.

In the configuration= section, it is mentioned that for "many operators" heap size sho= uld be increased.

"JVM heap size (in megabytes) for the JobMana= ger. You may have to increase the heap size for the JobManager if you are r= unning very large applications (with many operators), or if you are keeping= a long history of them."

Is there any recommendation on = the heap space required when there are around 200 CEP operators, and close = 80 Filter operators?

Any other leads on calculating the expect= ed heap space allocation to start the job would be really helpful.

<= /div>Thanks,
Shailesh


<= div class=3D"gmail_extra">
On Sat, Feb 17, 20= 18 at 5:53 PM, Shailesh Jain <shailesh.jain@stellapps.com>= ; wrote:
Hi,

I have flink job with almost 300= operators, and every time I'm trying to submit the job, the cluster cr= ashes with OutOfMemory exception.

I have 1 job manager and 1 t= ask manager with 2 GB heap space allocated to both.

In the con= figuration section of the documentation










--94eb2c0505fa4d5cbe0565a69b64--