flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chan, Regina" <Regina.C...@gs.com>
Subject Flink Parquet Read/Write Performance
Date Thu, 17 Aug 2017 21:52:10 GMT

I'm trying to figure out why reading and writing ~5GB worth of parquet files seems to take
3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 Parallelism. I've copied in the
execution plan the taskmanager times below. Other details include that we're reading 20 snappy
compresed parquet files each ~240MB each. (see below)

I'm trying to use this for a milestoning logic where we take new avro files from staging and
join with the existing milestoned parquet data. I have a small staging file with only about
1500 records inside so I reduce the number of records sent to the cogroup in order to make
this faster. To do this, I'm basically reading in GenericRecords from parquet files twice,
once to filter out for "live" records where we then further filter the records for ones with
keys matching what we found in a separate avro file. This is so reduction of records makes
that part of the plan total to 1 minute 58 secs.

The concern is the other records with non-live/not-matching-keys. In theory, I expect this
to be fast since it's just chaining the operations across all the way through to the sink.
However, this part takes about 4 minutes. We're not doing anything different from the other
Datasource aside from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord>
where the short is a bitmap value mapping to where the record needs to be written.

Other notes:
I checked the backpressure on the datasource->filter->map->map and it was OK. I'm
not sure what else could be holding it up.
I also profiled it when I ran it on a single task manager single slot and it seems to spend
most of the time waiting.

Any ideas? Instead of truly chaining is it writing to disk and serializing multiple times
inside each operation?

Data Source :
hdfs dfs -du -h <folder_name>
240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet
237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet
241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet
243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet
238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet
241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet
241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet
240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet
240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet
239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet
237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet
240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet
240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet
236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet
242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet
241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet
240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet
241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet
239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet
237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet
240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet

yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 4096 -tm 20480
-s 2 -n 10  -d]





View raw message