Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 69E57200D4E for ; Thu, 7 Dec 2017 19:11:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 682AA160C1E; Thu, 7 Dec 2017 18:11:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6A211160C08 for ; Thu, 7 Dec 2017 19:11:46 +0100 (CET) Received: (qmail 88348 invoked by uid 500); 7 Dec 2017 18:11:40 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 88335 invoked by uid 99); 7 Dec 2017 18:11:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Dec 2017 18:11:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 92DCDC169C for ; Thu, 7 Dec 2017 18:11:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.88 X-Spam-Level: ** X-Spam-Status: No, score=2.88 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Y41GzEH1t7D9 for ; Thu, 7 Dec 2017 18:11:23 +0000 (UTC) Received: from mail-vk0-f50.google.com (mail-vk0-f50.google.com [209.85.213.50]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 6B4AE5FD73 for ; Thu, 7 Dec 2017 17:40:24 +0000 (UTC) Received: by mail-vk0-f50.google.com with SMTP id q189so5394397vke.0 for ; Thu, 07 Dec 2017 09:40:24 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=D9sPIKZn3QA1J0DPbvSl9Z+OZwF+hP1alvBzh8bwgEM=; b=gO39U1KT0NVMDQzCI4k2zQnA1C0AfLuVCiNcOsXj1E/b7ZhFAokeFEiq8ZEC70Um+o Xo5sagsvizDkLn6uf/+XOVG7leOfYSb9YpbUPJOgbJmtu9cov1PyjWMeg3soPRsXOKLX YFsybrua5D2OfKm5h0TOlNDIbx46pna5szCH2W84it5nYeVHkqKQgMCyxDUdQ/oqPU8S vRJn8NmsokwiJmyoazdrvLjgMKd3Ix48WtBDg529DL7JZ1xOZtFOj2Y1URHCQabb7dSu qSZIW8tZjUKBpGQcnPbEy2DikgALLhKaEcqgZ262csNgSpyIp/ZANARGEqRx+zylECY7 mZow== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=D9sPIKZn3QA1J0DPbvSl9Z+OZwF+hP1alvBzh8bwgEM=; b=ulBEWAVdxnqCaaKa6Mbrs1NiETaut2zyPchd+QoSYXYPy+SbMn7X6rNHhA6KtRF6H+ 7VJCoPTkgSUDGrpzs6MROmqd+t+xfXcs6Sm2H7zrKIYFC4+TRcV/UQsmlw/OiVR+RTIK qiTlw1zy5wQZv5jbywflhVjzg2GjvkYAp727Y1NZq0iZmcu+r0YogV9BZ9dOu5iCyQas fSwMw2+AVZikrU3R+mfd4GiY0dCdnGWDoWt+es57k/475Rfc0Y6GpNEI6R6tL4oqMABb C0Or4GcE0BhkgrP5O8vF7OWtbuLwNRVoqny1l+IhrppvBIIrzKxv02Trm4gmAu4aq8J1 YmGg== X-Gm-Message-State: AKGB3mLKBGlaesuLM/hVOEyt08zsTbtUoYgqS9Hf/IcwwZwi/B4N4TsC sVWD9bvDIrhA7P+q8qaCHkJAho2t+GlVFNazp7U= X-Google-Smtp-Source: AGs4zMZz5QMD4lZQaswLTV/svVF2O0xeUpLru9nCWUzaby0Q6dS5BflUu62vgLo8ug9unhz/xPxp+o4bBRFsoDhKQO4= X-Received: by 10.31.125.75 with SMTP id y72mr13626865vkc.79.1512668423835; Thu, 07 Dec 2017 09:40:23 -0800 (PST) MIME-Version: 1.0 Received: by 10.103.106.70 with HTTP; Thu, 7 Dec 2017 09:39:43 -0800 (PST) In-Reply-To: References: From: Fabian Hueske Date: Thu, 7 Dec 2017 18:39:43 +0100 Message-ID: Subject: Re: Flink Batch Performance degradation at scale To: Garrett Barton Cc: user Content-Type: multipart/alternative; boundary="94eb2c14c83010d40c055fc3905f" archived-at: Thu, 07 Dec 2017 18:11:48 -0000 --94eb2c14c83010d40c055fc3905f Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Ah, no direct memory buffer... Can you try to disable off-heap memory? 2017-12-07 18:35 GMT+01:00 Garrett Barton : > Stacktrace generates every time with the following settings (tried > different memory fractions): > yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 > akka.ask.timeout: 60s > containerized.heap-cutoff-ratio: 0.15 > taskmanager.memory.fraction: 0.7/0.3/0.1 > taskmanager.memory.off-heap: true > taskmanager.memory.preallocate: true > env.getConfig().setExecutionMode(ExecutionMode.BATCH) > > Hand Jammed top of the stack: > java.lang.RuntimeException: Error obtaining the sorted input: Thread > 'SortMerger Reading Thread' terminated due to an exception: > java.lang.OutOfMemoryError: Direct buffer memory > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger. > getInterator(UnilateralSortMerger.java:619) > at org.apache.flink.runtime.operators.BatchTask.getInput( > BatchTask.java:1095) > at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355= ) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Thread' terminated due to an exception: > java.lang.OutOfMemoryError: Direct buffer memory > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ > ThreadBase.run(UnilateralSortMerger.java:800) > Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTrans= portException: > java.lang.OutOfMemoryError: Direct buffer memory > at org.apache.flink.runtime.io.network.netty. > PartitionRequestClientHandler.exceptionCaught( > PartitionRequestClientHandler.java:149) > ... lots of netty stuffs > > > While I observe the taskmanagers I never see their JVM heaps get high at > all. Mind you I cant tell which task will blow and then see its TM in ti= me > to see what it looks like. But each one I do look at the heap usage is > ~150MB/6.16GB (with fraction: 0.1) > > On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske wrote: > >> Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace? >> The managed memory should be divided among all possible consumers. In >> case of your simple job, this should just be Sorter. >> In fact, I'd try to reduce the fraction to give more memory to the JVM >> heap (OOM means there was not enough (heap) memory). >> >> Enabling BATCH mode means that the records are not shipped to the sorter >> in a pipelined fashion but buffered at (and written to the disk of) the >> sender task. >> Once the input was consumed, the data is shipped to the receiver tasks >> (the sorter). This mode decouples tasks and also reduces the number of >> network buffers because fewer connection must be active at the same time= .+ >> Here's a link to an internal design document (not sure how up to date it >> is though...) [1]. >> >> Did you try to check if the problem is cause by data skew? >> You could add a MapPartition tasks instead of the PartitionSorter to >> count the number of records per partition. >> >> Best, Fabian >> >> [1] https://cwiki.apache.org/confluence/display/FLINK/Data+excha >> nge+between+tasks >> >> 2017-12-07 16:30 GMT+01:00 Garrett Barton : >> >>> Thanks for the reply again, >>> >>> I'm currently doing runs with: >>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 >>> akka.ask.timeout: 60s >>> containerized.heap-cutoff-ratio: 0.15 >>> taskmanager.memory.fraction: 0.7 >>> taskmanager.memory.off-heap: true >>> taskmanager.memory.preallocate: true >>> >>> When I change the config setExecutionMode() to BATCH, no matter what >>> memory fraction I choose the sort instantly fails with SortMerger OOM >>> exceptions. Even when I set fraction to 0.95. The data source part is >>> ridiculously fast though, ~30 seconds! Disabling batch mode and keepin= g >>> the other changes looks like to do the same behavior as before, jobs be= en >>> running for ~20 minutes now. Does Batch mode disable spilling to disk,= or >>> does batch with a combo of off heap disable spilling to disk? Is there >>> more documentation on what Batch mode does under the covers? >>> >>> As for the flow itself, yes it used to be a lot smaller, I broke it out >>> manually by adding the sort/partition to see which steps were causing m= e >>> the slowdown, thinking it was my code, I wanted to separate the operati= ons. >>> >>> Thank you again for your help. >>> >>> On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske wrote= : >>> >>>> That doesn't look like a bad configuration. >>>> >>>> I have to correct myself regarding the size of the managed memory. The >>>> fraction (70%) is applied on the free memory after the TM initializati= on. >>>> This means that memory for network buffers (and other data structures)= are >>>> subtracted before the managed memory is allocated. >>>> The actual size of the managed memory is logged in the TM log file >>>> during start up. >>>> >>>> You could also try to decrease the number of slots per TM to 1 but add >>>> more vCores (yarn.containers.vcores []) because the sorter runs in >>>> multiple threads. >>>> >>>> Adding a GroupCombineFunction for pre-aggregation (if possible...) >>>> would help to mitigate the effects of the data skew. >>>> Another thing I'd like to ask: Are you adding the partitioner and >>>> sorter explicitly to the plan and if so why? Usually, the partitioning= and >>>> sorting is done as part of the GroupReduce. >>>> >>>> Best, Fabian >>>> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>> setup/config.html#yarn >>>> >>>> 2017-12-06 23:32 GMT+01:00 Garrett Barton : >>>> >>>>> Wow thank you for the reply, you gave me a lot to look into and mess >>>>> with. I'll start testing with the various memory options and env sett= ings >>>>> tomorrow. >>>>> >>>>> BTW the current flink cluster is launched like: >>>>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 >>>>> >>>>> with flink-conf.yaml property overrides of: >>>>> # so bigger clusters don't fail to init >>>>> akka.ask.timeout: 60s >>>>> # so more memory is given to the JVM from the yarn container >>>>> containerized.heap-cutoff-ratio: 0.15 >>>>> >>>>> So each flink slot doesn't necessarily get a lot of ram, you said 70% >>>>> of ram goes to the job by default, so that's (9200*0.85)*0.70 =3D 547= 4MB. So >>>>> each slot is sitting with ~2737MB of usable space. Would you have a >>>>> different config for taking overall the same amount of ram? >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske >>>>> wrote: >>>>> >>>>>> Hi Garrett, >>>>>> >>>>>> data skew might be a reason for the performance degradation. >>>>>> >>>>>> The plan you shared is pretty simple. The following happens you run >>>>>> the program: >>>>>> - The data source starts to read data and pushes the records to the >>>>>> FlatMapFunction. From there the records are shuffed (using >>>>>> hash-partitioning) to the sorter. >>>>>> - The sorter tasks consume the records and write them into a memory >>>>>> buffer. When the buffer is full, it is sorted and spilled to disk. W= hen the >>>>>> buffer was spilled, it is filled again with records, sorted, and spi= lled. >>>>>> - The initially fast processing happens because at the beginning the >>>>>> sorter is not waiting for buffers to be sorted or spilled because th= ey are >>>>>> empty. >>>>>> >>>>>> The performance of the plan depends (among other things) on the size >>>>>> of the sort buffers. The sort buffers are taken from Flink's managed >>>>>> memory. >>>>>> Unless you configured something else, 70% of to the TaskManager heap >>>>>> memory is reserved as managed memory. >>>>>> If you use Flink only for batch jobs, I would enable preallocation >>>>>> and off-heap memory (see configuration options [1]). You can also co= nfigure >>>>>> a fixed size for the managed memory. The more memory you configure, = the >>>>>> more is available for sorting. >>>>>> >>>>>> The managed memory of a TM is evenly distributed to all its >>>>>> processing slots. Hence, having more slots per TM means that each sl= ot has >>>>>> fewer managed memory (for sorting or joins or ...). >>>>>> So many slots are not necessarily good for performance (unless you >>>>>> increase the number of TMs / memory as well), especially in case of = data >>>>>> skew when most slots receive only little data and cannot leverage th= eir >>>>>> memory. >>>>>> If your data is heavily skewed, it might make sense to have fewer >>>>>> slots such that each slot has more memory for sorting. >>>>>> >>>>>> Skew has also an effect on downstream operations. In case of skew, >>>>>> some of the sorter tasks are overloaded and cannot accept more data. >>>>>> Due to the pipelined shuffles, this leads to a back pressure behavio= r >>>>>> that propagates down to the sources. >>>>>> You can disable pipelining by setting the execution mode on the >>>>>> execution configuration to BATCH [2]. This will break the pipeline b= ut >>>>>> write the result of the FlatMap to disk. >>>>>> This might help, if the FlatMap is compute intensive or filters many >>>>>> records. >>>>>> >>>>>> The data sizes don't sound particular large, so this should be >>>>>> something that Flink should be able to handle. >>>>>> >>>>>> Btw. you don't need to convert the JSON plan output. You can paste i= t >>>>>> into the plan visualizer [3]. >>>>>> I would not worry about the missing statistics. The optimizer does >>>>>> not leverage them at the current state. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>> setup/config.html#managed-memory >>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>> dev/execution_configuration.html >>>>>> [3] http://flink.apache.org/visualizer/ >>>>>> >>>>>> 2017-12-06 16:45 GMT+01:00 Garrett Barton = : >>>>>> >>>>>>> Fabian, >>>>>>> >>>>>>> Thank you for the reply. Yes I do watch via the ui, is there >>>>>>> another way to see progress through the steps? >>>>>>> >>>>>>> I think I just figured it out, the hangup is in the sort phase (ID >>>>>>> 4) where 2 slots take all the time. Looking in the UI most slots g= et less >>>>>>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, to= gether >>>>>>> its about 272M records and these will run for hours at this point. = Looks >>>>>>> like I need to figure out a different partitioning/sort strategy. I= never >>>>>>> noticed before because when I run the system at ~1400 slots I don't= use the >>>>>>> UI anymore as its gets unresponsive. 400 Slots is painfully slow, = but >>>>>>> still works. >>>>>>> >>>>>>> >>>>>>> The getEnv output is very cool! Also very big, I've tried to >>>>>>> summarize it here in more of a yaml format as its on a different ne= twork. >>>>>>> Note the parallelism was just set to 10 as I didn't know if that ef= fected >>>>>>> output. Hopefully I didn't flub a copy paste step, it looks good t= o me. >>>>>>> >>>>>>> >>>>>>> =E2=80=8BThis flow used to be far fewer steps, but as it wasn't sca= ling I >>>>>>> broke it out into all the distinct pieces so I could see where it f= ailed.=E2=80=8B >>>>>>> Source and sink are both Hive tables. I wonder if the inputformat = is >>>>>>> expected to give more info to seed some of these stat values? >>>>>>> >>>>>>> =E2=80=8Bnodes >>>>>>> id: 6 >>>>>>> type: source >>>>>>> pact: Data Source >>>>>>> contents: at CreateInput(ExecutionEnvironment.java:533) >>>>>>> parallelism: 10 >>>>>>> global_properties: >>>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>>> name: Partitioning Order value: none >>>>>>> name: Uniqueness value: not unique >>>>>>> local_properties: >>>>>>> name: Order value: none >>>>>>> name: Grouping value: not grouped >>>>>>> name: Uniqueness value: not unique >>>>>>> estimates: >>>>>>> name: Est. Output Size value: unknown >>>>>>> name: Est Cardinality value: unknown >>>>>>> costs: >>>>>>> name: Network value: 0 >>>>>>> name: Disk I/O value 0 >>>>>>> name: CPU value: 0 >>>>>>> name: Cumulative Network value: 0 >>>>>>> name: Cumulative Disk I/O value: 0 >>>>>>> name: Cumulative CPU value: 0 >>>>>>> compiler_hints: >>>>>>> name: Output Size (bytes) value: none >>>>>>> name: Output Cardinality value: none >>>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>>> name: Filter Factor value: none >>>>>>> >>>>>>> id: 5 >>>>>>> type: pact >>>>>>> pact: FlatMap >>>>>>> contents: FlatMap at main() >>>>>>> parallelism: 10 >>>>>>> predecessors: >>>>>>> id: 6, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>>> driver_strategy: FlatMap >>>>>>> global_properties: >>>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>>> name: Partitioning Order value: none >>>>>>> name: Uniqueness value: not unique >>>>>>> local_properties: >>>>>>> name: Order value: none >>>>>>> name: Grouping value: not grouped >>>>>>> name: Uniqueness value: not unique >>>>>>> estimates: >>>>>>> name: Est. Output Size value: unknown >>>>>>> name: Est Cardinality value: unknown >>>>>>> costs: >>>>>>> name: Network value: 0 >>>>>>> name: Disk I/O value 0 >>>>>>> name: CPU value: 0 >>>>>>> name: Cumulative Network value: 0 >>>>>>> name: Cumulative Disk I/O value: 0 >>>>>>> name: Cumulative CPU value: 0 >>>>>>> compiler_hints: >>>>>>> name: Output Size (bytes) value: none >>>>>>> name: Output Cardinality value: none >>>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>>> name: Filter Factor value: none >>>>>>> >>>>>>> id: 4 >>>>>>> type: pact >>>>>>> pact: Sort-Partition >>>>>>> contents: Sort at main() >>>>>>> parallelism: 10 >>>>>>> predecessors: >>>>>>> id: 5, ship_strategy: Hash Partition on [0,2] >>>>>>> local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELIN= ED >>>>>>> driver_strategy: No-Op >>>>>>> global_properties: >>>>>>> name: partitioning v: HASH_PARTITIONED >>>>>>> name: Partitioned on value: [0,2] >>>>>>> name: Partitioning Order value: none >>>>>>> name: Uniqueness value: not unique >>>>>>> local_properties: >>>>>>> name: Order value: [0:ASC,2:ASC,1:ASC] >>>>>>> name: Grouping value: [0,2,1] >>>>>>> name: Uniqueness value: not unique >>>>>>> estimates: >>>>>>> name: Est. Output Size value: unknown >>>>>>> name: Est Cardinality value: unknown >>>>>>> costs: >>>>>>> name: Network value: 0 >>>>>>> name: Disk I/O value 0 >>>>>>> name: CPU value: 0 >>>>>>> name: Cumulative Network value: unknown >>>>>>> name: Cumulative Disk I/O value: unknown >>>>>>> name: Cumulative CPU value: unknown >>>>>>> compiler_hints: >>>>>>> name: Output Size (bytes) value: none >>>>>>> name: Output Cardinality value: none >>>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>>> name: Filter Factor value: none >>>>>>> >>>>>>> id: 3 >>>>>>> type: pact >>>>>>> pact: GroupReduce >>>>>>> contents: GroupReduce at first(SortedGrouping.java:210) >>>>>>> parallelism: 10 >>>>>>> predecessors: >>>>>>> id: 4, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>>> driver_strategy: Sorted Group Reduce >>>>>>> global_properties: >>>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>>> name: Partitioning Order value: none >>>>>>> name: Uniqueness value: not unique >>>>>>> local_properties: >>>>>>> name: Order value: none >>>>>>> name: Grouping value: not grouped >>>>>>> name: Uniqueness value: not unique >>>>>>> estimates: >>>>>>> name: Est. Output Size value: unknown >>>>>>> name: Est Cardinality value: unknown >>>>>>> costs: >>>>>>> name: Network value: 0 >>>>>>> name: Disk I/O value 0 >>>>>>> name: CPU value: 0 >>>>>>> name: Cumulative Network value: unknown >>>>>>> name: Cumulative Disk I/O value: unknown >>>>>>> name: Cumulative CPU value: unknown >>>>>>> compiler_hints: >>>>>>> name: Output Size (bytes) value: none >>>>>>> name: Output Cardinality value: none >>>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>>> name: Filter Factor value: none >>>>>>> >>>>>>> >>>>>>> id: 2 >>>>>>> type: pact >>>>>>> pact: Map >>>>>>> contents: Map at () >>>>>>> parallelism: 10 >>>>>>> predecessors: >>>>>>> id: 3, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>>> driver_strategy: Map >>>>>>> global_properties: >>>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>>> name: Partitioning Order value: none >>>>>>> name: Uniqueness value: not unique >>>>>>> local_properties: >>>>>>> name: Order value: none >>>>>>> name: Grouping value: not grouped >>>>>>> name: Uniqueness value: not unique >>>>>>> estimates: >>>>>>> name: Est. Output Size value: unknown >>>>>>> name: Est Cardinality value: unknown >>>>>>> costs: >>>>>>> name: Network value: 0 >>>>>>> name: Disk I/O value 0 >>>>>>> name: CPU value: 0 >>>>>>> name: Cumulative Network value: unknown >>>>>>> name: Cumulative Disk I/O value: unknown >>>>>>> name: Cumulative CPU value: unknown >>>>>>> compiler_hints: >>>>>>> name: Output Size (bytes) value: none >>>>>>> name: Output Cardinality value: none >>>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>>> name: Filter Factor value: none >>>>>>> >>>>>>> id: 1 >>>>>>> type: pact >>>>>>> pact: Map >>>>>>> contents: map at main() >>>>>>> parallelism: 10 >>>>>>> predecessors: >>>>>>> id: 2, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>>> driver_strategy: Map >>>>>>> global_properties: >>>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>>> name: Partitioning Order value: none >>>>>>> name: Uniqueness value: not unique >>>>>>> local_properties: >>>>>>> name: Order value: none >>>>>>> name: Grouping value: not grouped >>>>>>> name: Uniqueness value: not unique >>>>>>> estimates: >>>>>>> name: Est. Output Size value: unknown >>>>>>> name: Est Cardinality value: unknown >>>>>>> costs: >>>>>>> name: Network value: 0 >>>>>>> name: Disk I/O value 0 >>>>>>> name: CPU value: 0 >>>>>>> name: Cumulative Network value: unknown >>>>>>> name: Cumulative Disk I/O value: unknown >>>>>>> name: Cumulative CPU value: unknown >>>>>>> compiler_hints: >>>>>>> name: Output Size (bytes) value: none >>>>>>> name: Output Cardinality value: none >>>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>>> name: Filter Factor value: none >>>>>>> >>>>>>> id: 0 >>>>>>> type: sink >>>>>>> pact: Data Sink >>>>>>> contents: org.apache.flink.api.java.jado >>>>>>> op.mapreduce.HadoopOutputFormat >>>>>>> parallelism: 10 >>>>>>> predecessors: >>>>>>> id: 1, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>>> driver_strategy: Map >>>>>>> global_properties: >>>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>>> name: Partitioning Order value: none >>>>>>> name: Uniqueness value: not unique >>>>>>> local_properties: >>>>>>> name: Order value: none >>>>>>> name: Grouping value: not grouped >>>>>>> name: Uniqueness value: not unique >>>>>>> estimates: >>>>>>> name: Est. Output Size value: unknown >>>>>>> name: Est Cardinality value: unknown >>>>>>> costs: >>>>>>> name: Network value: 0 >>>>>>> name: Disk I/O value 0 >>>>>>> name: CPU value: 0 >>>>>>> name: Cumulative Network value: unknown >>>>>>> name: Cumulative Disk I/O value: unknown >>>>>>> name: Cumulative CPU value: unknown >>>>>>> compiler_hints: >>>>>>> name: Output Size (bytes) value: none >>>>>>> name: Output Cardinality value: none >>>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>>> name: Filter Factor value: none=E2=80=8B >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Flink's operators are designed to work in memory as long as >>>>>>>> possible and spill to disk once the memory budget is exceeded. >>>>>>>> Moreover, Flink aims to run programs in a pipelined fashion, such >>>>>>>> that multiple operators can process data at the same time. >>>>>>>> This behavior can make it a bit tricky to analyze the runtime >>>>>>>> behavior and progress of operators. >>>>>>>> >>>>>>>> It would be interesting to have a look at the execution plan for >>>>>>>> the program that you are running. >>>>>>>> The plan can be obtained from the ExecutionEnvironment by calling >>>>>>>> env.getExecutionPlan() instead of env.execute(). >>>>>>>> >>>>>>>> I would also like to know how you track the progress of the >>>>>>>> program. >>>>>>>> Are you looking at the record counts displayed in the WebUI? >>>>>>>> >>>>>>>> Best, >>>>>>>> Fabian >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2017-12-05 22:03 GMT+01:00 Garrett Barton >>>>>>> >: >>>>>>>> >>>>>>>>> I have been moving some old MR and hive workflows into Flink >>>>>>>>> because I'm enjoying the api's and the ease of development is won= derful. >>>>>>>>> Things have largely worked great until I tried to really scale so= me of the >>>>>>>>> jobs recently. >>>>>>>>> >>>>>>>>> I have for example one etl job that reads in about 12B records at >>>>>>>>> a time and does a sort, some simple transformations, validation, = a >>>>>>>>> re-partition and then output to a hive table. >>>>>>>>> When I built it with the sample set, ~200M, it worked great, took >>>>>>>>> maybe a minute and blew threw it. >>>>>>>>> >>>>>>>>> What I have observed is there is some kind of saturation reached >>>>>>>>> depending on number of slots, number of nodes and the overall siz= e of data >>>>>>>>> to move. When I run the 12B set, the first 1B go through in unde= r 1 >>>>>>>>> minute, really really fast. But its an extremely sharp drop off = after >>>>>>>>> that, the next 1B might take 15 minutes, and then if I wait for t= he next >>>>>>>>> 1B, its well over an hour. >>>>>>>>> >>>>>>>>> What I cant find is any obvious indicators or things to look at, >>>>>>>>> everything just grinds to a halt, I don't think the job would eve= r actually >>>>>>>>> complete. >>>>>>>>> >>>>>>>>> Is there something in the design of flink in batch mode that is >>>>>>>>> perhaps memory bound? Adding more nodes/tasks does not fix it, j= ust gets >>>>>>>>> me a little further along. I'm already running around ~1,400 slo= ts at this >>>>>>>>> point, I'd postulate needing 10,000+ to potentially make the job = run, but >>>>>>>>> thats too much of my cluster gone, and I have yet to get flink to= be stable >>>>>>>>> past 1,500. >>>>>>>>> >>>>>>>>> Any idea's on where to look, or what to debug? GUI is also very >>>>>>>>> cumbersome to use at this slot count too, so other measurement id= eas are >>>>>>>>> welcome too! >>>>>>>>> >>>>>>>>> Thank you all. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > --94eb2c14c83010d40c055fc3905f Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Ah, no direct memory buffer...
Can you try t= o disable off-heap memory?

2017-12-07 18:35 GMT+01:00 Garrett Barton <garre= tt.barton@gmail.com>:
Stacktrace generates every time with the fo= llowing settings (tried different memory fractions):
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.h= eap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7/0.3/0.1
= taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: t= rue
env.getConfig().setExecutionMode(ExecutionMode.BATCH)

Hand Jammed top of the stack:
java.lang.= RuntimeException: Error obtaining the sorted input: Thread 'SortMerger = Reading Thread' terminated due to an exception: java.lang.OutOfMemoryEr= ror: Direct buffer memory
at org.apache.flink.= runtime.operators.sort.UnilateralSortMerger.getInterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operat= ors.BatchTask.getInput(BatchTask.java:1095)
at org.apache.flink.run= time.operators.NoOpDriver.run(NoOpDriver.java:82)
at org.apach= e.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at= org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.ja= va:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.= java:702)
at java.lang.Thread.run(Thread.= java:745)
Caused by: java.io.IOException: Thre= ad' terminated due to an exception: java.lang.OutOfMemoryError: Direct = buffer memory
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(Unilater= alSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.net= work.netty.exception.LocalTransportException: java.lang.OutOfMemoryErr= or: Direct buffer memory
at org.apache.flink.runtime.io.network.n= etty.PartitionRequestClientHandler.exceptionCaught(Partition= RequestClientHandler.java:149)
... lots o= f netty stuffs


While I observe the taskmanagers I never see their JVM heaps get high at= all.=C2=A0 Mind you I cant tell which task will blow and then see its TM i= n time to see what it looks like.=C2=A0 But each one I do look at the heap = usage is ~150MB/6.16GB (with fraction: 0.1)


2017-12-07 16:30 GMT+01:00 Garrett Bar= ton <garrett.barton@gmail.com>:
Thanks for the reply aga= in,

=C2=A0I'm currently doing runs wit= h:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-rati<= wbr>o: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-h= eap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matte= r what memory fraction I choose the sort instantly fails with SortMerger OO= M exceptions.=C2=A0 Even when I set fraction to 0.95.=C2=A0 The data source= part is ridiculously fast though, ~30 seconds!=C2=A0 Disabling batch mode = and keeping the other changes looks like to do the same behavior as before,= jobs been running for ~20 minutes now.=C2=A0 Does Batch mode disable spill= ing to disk, or does batch with a combo of off heap disable spilling to dis= k?=C2=A0 Is there more documentation on what Batch mode does under the cove= rs?

As for the flow itself, yes it used to= be a lot smaller, I broke it out manually by adding the sort/partition to = see which steps were causing me the slowdown, thinking it was my code, I wa= nted to separate the operations.

Thank you= again for your help.

On= Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhueske@gmail.com> wrote:
<= div>
That doesn't look like a bad configuration.

=
I have to correct myself regarding the size of the managed memory. Th= e fraction (70%) is applied on the free memory after the TM initialization.= This means that memory for network buffers (and other data structures) are= subtracted before the managed memory is allocated.
The actual siz= e of the managed memory is logged in the TM log file during start up.
You could also try to decrease the number of slots per TM to 1 but = add more vCores (yarn.containers.vcores []) because the sorter= runs in multiple threads.

Adding= a GroupCombineFunction for pre-aggregation (if possible...) would help to = mitigate the effects of the data skew.
Another thing I'd like= to ask: Are you adding the partitioner and sorter explicitly to the plan a= nd if so why? Usually, the partitioning and sorting is done as part of the = GroupReduce.

Best, Fabian<= br>
=

2017-12-= 06 23:32 GMT+01:00 Garrett Barton <garrett.barton@gmail.com>= :
Wow thank you for the reply, you gave me a lot to look into and mess w= ith. I'll start testing with the various memory options and env setting= s tomorrow.

BTW the current flink cluster = is launched like:
yarn-session.sh -n 700 -s 2 = -tm 9200 -jm 5120

with flink-conf.yaml pro= perty overrides of:
# so bigger clusters don&#= 39;t fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn c= ontainer
containerized.heap-cutoff-ratio:= 0.15

So each flink slot doesn't neces= sarily get a lot of ram, you said 70% of ram goes to the job by default, so= that's (9200*0.85)*0.70 =3D 5474MB.=C2=A0 So each slot is sitting with= ~2737MB of usable space.=C2=A0 Would you have a different config for takin= g overall the same amount of ram?



=
On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hues= ke <fhueske@gmail.com> wrote:
Hi Garrett,

data skew might be a reason for the per= formance degradation.

The plan you shared is pretty simp= le. The following happens you run the program:
- The data source s= tarts to read data and pushes the records to the FlatMapFunction. From ther= e the records are shuffed (using hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer= . When the buffer is full, it is sorted and spilled to disk. When the buffe= r was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the so= rter is not waiting for buffers to be sorted or spilled because they are em= pty.

The performance of the plan depends (among other things) = on the size of the sort buffers. The sort buffers are taken from Flink'= s managed memory.
Unless you configured something else, 70% of to= the TaskManager heap memory is reserved as managed memory.
If yo= u use Flink only for batch jobs, I would enable preallocation and off-heap = memory (see configuration options [1]). You can also configure a fixed size= for the managed memory. The more memory you configure, the more is availab= le for sorting.

The managed memory of a TM is evenly distribut= ed to all its processing slots. Hence, having more slots per TM means that = each slot has fewer managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you in= crease the number of TMs / memory as well), especially in case of data skew= when most slots receive only little data and cannot leverage their memory.=
If your data is heavily skewed, it might make sense to have fewer= slots such that each slot has more memory for sorting.

<= /div>
Skew has also an effect on downstream operations. In case of skew= , some of the sorter tasks are overloaded and cannot accept more data.
=
Due to the pipelined shuffles, this leads to a back pressure beh= avior that propagates down to the sources.
You can disable p= ipelining by setting the execution mode on the execution configuration to B= ATCH [2]. This will break the pipeline but write the result of the FlatMap = to disk.
This might help, if the FlatMap is compute intensive or filter= s many records.

The d= ata sizes don't sound particular large, so this should be something tha= t Flink should be able to handle.

Btw. you don'= ;t need to convert the JSON plan output. You can paste it into the plan vis= ualizer [3].
I would not worry about the missing statistics. The = optimizer does not leverage them at the current state.

2017-12-06 16:45 GMT+01:00 Gar= rett Barton <garrett.barton@gmail.com>:
Fabian,

<= div style=3D"font-family:arial,helvetica,sans-serif;color:rgb(0,0,0)">=C2= =A0Thank you for the reply.=C2=A0 Yes I do watch via the ui, is there anoth= er way to see progress through the steps?

I think I just figured i= t out, the hangup is in the sort phase (ID 4) where 2 slots take all the ti= me.=C2=A0 Looking in the UI most slots get less than 500MB of data to sort,= these two have 6.7GB and 7.3GB each, together its about 272M records and t= hese will run for hours at this point.=C2=A0 Looks like I need to figure ou= t a different partitioning/sort strategy. I never noticed before because wh= en I run the system at ~1400 slots I don't use the UI anymore as its ge= ts unresponsive.=C2=A0 400 Slots is painfully slow, but still works.
<= br>
The getEnv output is very cool! Also very big, I've tried to s= ummarize it here in more of a yaml format as its on a different network.=C2= =A0 Note the parallelism was just set to 10 as I didn't know if that ef= fected output.=C2=A0 Hopefully I didn't flub a copy paste step, it look= s good to me.=C2=A0


=E2=80=8BThis flow used to be far fe= wer steps, but as it wasn't scaling I broke it out into all the distinc= t pieces so I could see where it failed.=E2=80=8B=C2=A0 Source and sink are= both Hive tables.=C2=A0 I wonder if the inputformat is expected to give mo= re info to seed some of these stat values?

=E2= =80=8Bnodes
=C2=A0=C2=A0=C2=A0 id: 6
=C2=A0=C2=A0=C2=A0 type: source<= br>=C2=A0=C2=A0=C2=A0 pact: Data Source
=C2=A0=C2=A0=C2=A0 contents: at = CreateInput(ExecutionEnvironment.java:533)
=C2=A0=C2=A0=C2=A0 paral= lelism: 10
=C2=A0=C2=A0=C2=A0 global_properties:
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 name: partitioning v: RANDOM_PARTITIONED
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Partitioning Order value: none
=C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness value: not unique
=C2=A0= =C2=A0=C2=A0 local_properties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 nam= e: Order value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Groupin= g value: not grouped
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniquen= ess value: not unique
=C2=A0=C2=A0=C2=A0 estimates:
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 name: Est. Output Size value: unknown
=C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Est Cardinality value: unknown
=C2=A0= =C2=A0=C2=A0 costs:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Network = value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Disk I/O value 0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: CPU value: 0
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Network value: 0
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Disk I/O value: 0
=C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative CPU value: 0
=C2=A0=C2=A0= =C2=A0 compiler_hints:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Outpu= t Size (bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: O= utput Cardinality value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name= : Avg. Output Record Size (bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 name: Filter Factor value: none

=C2=A0=C2=A0=C2=A0 id: = 5
=C2=A0=C2=A0=C2=A0 type: pact
=C2=A0=C2=A0=C2=A0 pact: FlatMap
= =C2=A0=C2=A0=C2=A0 contents: FlatMap at main()
=C2=A0=C2=A0=C2=A0 parall= elism: 10
=C2=A0=C2=A0=C2=A0 predecessors:
=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
=C2= =A0=C2=A0=C2=A0 driver_strategy: FlatMap
=C2=A0=C2=A0=C2=A0 global_prope= rties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: partitioning v: RANDO= M_PARTITIONED
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Partitioning O= rder value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness = value: not unique
=C2=A0=C2=A0=C2=A0 local_properties:
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Order value: none
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 name: Grouping value: not grouped
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 name: Uniqueness value: not unique
=C2=A0=C2=A0=C2=A0 es= timates:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Est. Output Size va= lue: unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Est Cardinality= value: unknown
=C2=A0=C2=A0=C2=A0 costs:
=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 name: Network value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 name: Disk I/O value 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: C= PU value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Netwo= rk value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Disk = I/O value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative CPU = value: 0
=C2=A0=C2=A0=C2=A0 compiler_hints:
=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 name: Output Size (bytes) value: none
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 name: Output Cardinality value: none
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 name: Avg. Output Record Size (bytes) value: none=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Filter Factor value: none
<= br>=C2=A0=C2=A0=C2=A0 id: 4
=C2=A0=C2=A0=C2=A0 type: pact
=C2=A0=C2= =A0=C2=A0 pact: Sort-Partition
=C2=A0=C2=A0=C2=A0 contents: Sort at main= ()
=C2=A0=C2=A0=C2=A0 parallelism: 10
=C2=A0=C2=A0=C2=A0 predecessors= :
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 id: 5, ship_strategy: Hash Parti= tion on [0,2] local_strategy: Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: P= IPELINED
=C2=A0=C2=A0=C2=A0 driver_strategy: No-Op
=C2=A0=C2=A0=C2=A0= global_properties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: partitio= ning v: HASH_PARTITIONED
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Par= titioned on value: [0,2]
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Par= titioning Order value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: = Uniqueness value: not unique
=C2=A0=C2=A0=C2=A0 local_properties:
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Order value: [0:ASC,2:ASC,1:ASC]=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Grouping value: [0,2,1]
= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness value: not unique=C2=A0=C2=A0=C2=A0 estimates:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 nam= e: Est. Output Size value: unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= name: Est Cardinality value: unknown
=C2=A0=C2=A0=C2=A0 costs:
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Network value: 0
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Disk I/O value 0
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 name: CPU value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= name: Cumulative Network value: unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 name: Cumulative Disk I/O value: unknown
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 name: Cumulative CPU value: unknown
=C2=A0=C2=A0=C2=A0 c= ompiler_hints:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Output Size (= bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Output Ca= rdinality value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Avg. O= utput Record Size (bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 name: Filter Factor value: none

=C2=A0=C2=A0=C2=A0 id: 3
=C2= =A0=C2=A0=C2=A0 type: pact
=C2=A0=C2=A0=C2=A0 pact: GroupReduce
=C2= =A0=C2=A0=C2=A0 contents: GroupReduce at first(SortedGrouping.java:210)
= =C2=A0=C2=A0=C2=A0 parallelism: 10
=C2=A0=C2=A0=C2=A0 predecessors:
= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 id: 4, ship_strategy: Forward, exchan= ge_mode: PIPELINED
=C2=A0=C2=A0=C2=A0 driver_strategy: Sorted Group Redu= ce
=C2=A0=C2=A0=C2=A0 global_properties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 name: partitioning v: RANDOM_PARTITIONED
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 name: Partitioning Order value: none
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness value: not unique
=C2=A0=C2=A0= =C2=A0 local_properties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Ord= er value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Grouping valu= e: not grouped
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness va= lue: not unique
=C2=A0=C2=A0=C2=A0 estimates:
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 name: Est. Output Size value: unknown
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 name: Est Cardinality value: unknown
=C2=A0=C2=A0=C2= =A0 costs:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Network value: 0<= br>=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Disk I/O value 0
=C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: CPU value: 0
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 name: Cumulative Network value: unknown
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Disk I/O value: unknown
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative CPU value: unknown
= =C2=A0=C2=A0=C2=A0 compiler_hints:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= name: Output Size (bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 name: Output Cardinality value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 name: Avg. Output Record Size (bytes) value: none
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Filter Factor value: none


=C2=A0= =C2=A0=C2=A0 id: 2
=C2=A0=C2=A0=C2=A0 type: pact
=C2=A0=C2=A0=C2=A0 p= act: Map
=C2=A0=C2=A0=C2=A0 contents: Map at ()
=C2=A0=C2=A0=C2=A0 pa= rallelism: 10
=C2=A0=C2=A0=C2=A0 predecessors:
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
= =C2=A0=C2=A0=C2=A0 driver_strategy: Map
=C2=A0=C2=A0=C2=A0 global_proper= ties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: partitioning v: RANDOM= _PARTITIONED
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Partitioning Or= der value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness v= alue: not unique
=C2=A0=C2=A0=C2=A0 local_properties:
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 name: Order value: none
=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 name: Grouping value: not grouped
=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 name: Uniqueness value: not unique
=C2=A0=C2=A0=C2=A0 estim= ates:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Est. Output Size value= : unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Est Cardinality va= lue: unknown
=C2=A0=C2=A0=C2=A0 costs:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 name: Network value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 n= ame: Disk I/O value 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: CPU va= lue: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Network va= lue: unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Disk= I/O value: unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulati= ve CPU value: unknown
=C2=A0=C2=A0=C2=A0 compiler_hints:
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: Output Size (bytes) value: none
=C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Output Cardinality value: none
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Avg. Output Record Size (bytes) va= lue: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Filter Factor valu= e: none

=C2=A0=C2=A0=C2=A0 id: 1
=C2=A0=C2=A0=C2=A0 type: pact=C2=A0=C2=A0=C2=A0 pact: Map
=C2=A0=C2=A0=C2=A0 contents: map at main()=
=C2=A0=C2=A0=C2=A0 parallelism: 10
=C2=A0=C2=A0=C2=A0 predecessors:<= br>=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 id: 2, ship_strategy: Forward, exc= hange_mode: PIPELINED
=C2=A0=C2=A0=C2=A0 driver_strategy: Map
=C2=A0= =C2=A0=C2=A0 global_properties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 na= me: partitioning v: RANDOM_PARTITIONED
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 name: Partitioning Order value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 name: Uniqueness value: not unique
=C2=A0=C2=A0=C2=A0 local_pr= operties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Order value: none<= br>=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Grouping value: not grouped<= br>=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness value: not unique=
=C2=A0=C2=A0=C2=A0 estimates:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = name: Est. Output Size value: unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 name: Est Cardinality value: unknown
=C2=A0=C2=A0=C2=A0 costs:
= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Network value: 0
=C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Disk I/O value 0
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 name: CPU value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 name: Cumulative Network value: unknown
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 name: Cumulative Disk I/O value: unknown
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 name: Cumulative CPU value: unknown
=C2=A0=C2=A0=C2= =A0 compiler_hints:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Output S= ize (bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Outp= ut Cardinality value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: A= vg. Output Record Size (bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 name: Filter Factor value: none

=C2=A0=C2=A0=C2=A0 id: 0=C2=A0=C2=A0=C2=A0 type: sink
=C2=A0=C2=A0=C2=A0 pact: Data Sink
= =C2=A0=C2=A0=C2=A0 contents: org.apache.flink.api.java.jadoop.mapreduc= e.HadoopOutputFormat
=C2=A0=C2=A0=C2=A0 parallelism: 10
=C2=A0= =C2=A0=C2=A0 predecessors:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 id: 1, = ship_strategy: Forward, exchange_mode: PIPELINED
=C2=A0=C2=A0=C2=A0 driv= er_strategy: Map
=C2=A0=C2=A0=C2=A0 global_properties:
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 name: partitioning v: RANDOM_PARTITIONED
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Partitioning Order value: none
= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Uniqueness value: not unique=C2=A0=C2=A0=C2=A0 local_properties:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 name: Order value: none
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: = Grouping value: not grouped
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: = Uniqueness value: not unique
=C2=A0=C2=A0=C2=A0 estimates:
=C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Est. Output Size value: unknown
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Est Cardinality value: unknown
= =C2=A0=C2=A0=C2=A0 costs:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Ne= twork value: 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Disk I/O valu= e 0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: CPU value: 0
=C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Network value: unknown
= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative Disk I/O value: unkn= own
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Cumulative CPU value: un= known
=C2=A0=C2=A0=C2=A0 compiler_hints:
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 name: Output Size (bytes) value: none
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 name: Output Cardinality value: none
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 name: Avg. Output Record Size (bytes) value: none
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 name: Filter Factor value: none=E2=80=8B=




On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhueske= @gmail.com> wrote:
Hi,

= Flink's operators are designed to work in memory as long as possible an= d spill to disk once the memory budget is exceeded.
Moreover, Fli= nk aims to run programs in a pipelined fashion, such that multiple operator= s can process data at the same time.
This behavior can make = it a bit tricky to analyze the runtime behavior and progress of operators.<= br>

It would be interesting to have a look at the execu= tion plan for the program that you are running.
The plan can be ob= tained from the ExecutionEnvironment by calling env.getExecutionPlan() inst= ead of env.execute().

I would also like to know how you track = the progress of the program.
Are you looking at the record counts displ= ayed in the WebUI?

Best,
Fabian


2017-12-0= 5 22:03 GMT+01:00 Garrett Barton <garrett.barton@gmail.com><= /span>:
I= have been moving some old MR and hive workflows into Flink because I'm= enjoying the api's and the ease of development is wonderful.=C2=A0 Thi= ngs have largely worked great until I tried to really scale some of the job= s recently.

I have for example one etl job that reads in about 12B= records at a time and does a sort, some simple transformations, validation= , a re-partition and then output to a hive table.
When I built it with= the sample set, ~200M, it worked great, took maybe a minute and blew threw= it.

What I have observed is there is some kind of saturation reache= d depending on number of slots, number of nodes and the overall size of dat= a to move.=C2=A0 When I run the 12B set, the first 1B go through in under 1= minute, really really fast.=C2=A0 But its an extremely sharp drop off afte= r that, the next 1B might take 15 minutes, and then if I wait for the next = 1B, its well over an hour.

What I cant find is any obvious indicat= ors or things to look at, everything just grinds to a halt, I don't thi= nk the job would ever actually complete.

Is there something in the= design of flink in batch mode that is perhaps memory bound?=C2=A0 Adding m= ore nodes/tasks does not fix it, just gets me a little further along.=C2=A0= I'm already running around ~1,400 slots at this point, I'd postula= te needing 10,000+ to potentially make the job run, but thats too much of m= y cluster gone, and I have yet to get flink to be stable past 1,500.
Any idea's on where to look, or what to debug?=C2=A0 GUI is also ver= y cumbersome to use at this slot count too, so other measurement ideas are = welcome too!

Thank you all.









--94eb2c14c83010d40c055fc3905f--