spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jungtaek Lim (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-24763) Remove redundant key data from value in streaming aggregation
Date Thu, 12 Jul 2018 13:54:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541367#comment-16541367
] 

Jungtaek Lim edited comment on SPARK-24763 at 7/12/18 1:53 PM:
---------------------------------------------------------------

I had a chance to craft various key/value cases (bigger key, bigger value, many key columns,
many value columns) and ran similar tests on these cases. To handle 4 tests concurrently,
I just had 2 trials (instead of 3 trials) per each case & disable/enable.

Same spark version and same AWS dedicated instance, same command to run JVM (just increased
driver memory to 6g).

I'll describe test results per case.
{quote}App 1 - bigger key
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsKeyMuchBigger.scala]
 * key fields : window (start/end), mod (int), word (1000 chars string)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.019877228518|21173.52173524455|
|disable|2|50000.0|9999.980273266538|21241.022682965504|
|enable|1|50000.0|10000.00006336634|21745.986204268098|
|enable|2|50000.0|10000.000007920793|21382.004127689153|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2283.7425742574255|0.06930693069306931|null|62.475247524752476|2365.168316831683|10.267326732673267|
|disable|2|2274.3564356435645|0.09900990099009901|null|64.42574257425743|2358.059405940594|11.049504950495049|
|enable|1|2216.8514851485147|0.0891089108910891|null|65.23762376237623|2301.920792079208|10.752475247524753|
|enable|2|2260.366336633663|0.0297029702970297|null|63.742574257425744|2344.6633663366338|11.673267326732674|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34153|29274|68989767|
|disable|2|150|34153|29274|68981335|
|enable|1|150|38952|31352|44137447|
|enable|2|150|38969|31138|43849183|
 * average state row size

||mode||size||note||
|disable|2019.897256464| |
|enable|1129.178232806|55.90 %|
{quote}App 2 - bigger value
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsValueMuchBigger.scala]
 * key fields : window (start/end), mod (int)
 * value fields : max_value (int), min_value (int), avg_value (double), word_last (1000 chars
string)

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000039603963|20021.91393461469|
|disable|2|50000.0|10000.000071287132|19811.730674701703|
|enable|1|50000.0|10000.000435644328|20518.188917614298|
|enable|2|50000.0|10000.000071287132|20312.040082394597|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2417.990099009901|0.0297029702970297|null|64.9009900990099|2500.910891089109|10.158415841584159|
|disable|2|2447.7524752475247|0.039603960396039604|null|62.386138613861384|2529.3564356435645|10.712871287128714|
|enable|1|2357.891089108911|0.04950495049504951|null|62.81188118811881|2439.891089108911|10.881188118811881|
|enable|2|2381.5346534653463|0.04950495049504951|null|61.742574257425744|2463.108910891089|11.435643564356436|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|700|600|934447|
|disable|2|150|700|600|934223|
|enable|1|150|800|700|1029799|
|enable|2|150|800|700|1029183|
 * average state row size

||mode||size||note||
|disable|1334.764285714| |
|enable|1286.86375|96.41 %|
{quote}App 3 - many keys
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string), and 20 more columns
(int)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000055445547|17645.896698295066|
|disable|2|50000.0|10000.00008712872|17627.17064059849|
|enable|1|50000.0|10000.000039603963|18408.23205926807|
|enable|2|50000.0|10000.000071287132|18568.40828962387|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2739.990099009901|0.0594059405940594|null|77.48514851485149|2837.4851485148515|11.584158415841584|
|disable|2|2742.960396039604|0.04950495049504951|null|76.03960396039604|2839.019801980198|11.009900990099009|
|enable|1|2625.108910891089|0.019801980198019802|null|78.06930693069307|2722.3366336633662|10.623762376237623|
|enable|2|2599.5544554455446|0.039603960396039604|null|76.24752475247524|2695.029702970297|10.643564356435643|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|38946|31487|23036727|
|disable|2|150|38992|30800|23084343|
|enable|1|150|38915|31883|15208727|
|enable|2|150|39007|30401|14829975|
 * average state row size

||mode||size||note||
|disable|591.765993004| |
|enable|385.503337366|65.14 %|
{quote}App 4 - many values
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string)
 * value fields : set of (max (int), min (int), avg (double)) for 20 columns

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|63465.346534653465|10005.318225922994|9999.487035328104|
|disable|2|65247.52475247525|10011.503266213555|10001.570001460635|
|enable|1|62673.267326732675|9998.149926805827|9997.805403434037|
|enable|2|63564.35643564357|10000.005956852667|9996.36862736686|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|6235.069306930693|0.04950495049504951|null|93.63366336633663|6348.584158415842|9.910891089108912|
|disable|2|6410.3564356435645|0.039603960396039604|null|93.73267326732673|6523.821782178218|9.910891089108912|
|enable|1|6162.6732673267325|0.04950495049504951|null|88.62376237623762|6271.178217821782|10.435643564356436|
|enable|2|6244.079207920792|0.0297029702970297|null|94.26732673267327|6357.970297029703|9.930693069306932|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34776|29808|30019279|
|disable|2|150|39627|33928|33920375|
|enable|1|150|39842|30502|31837671|
|enable|2|150|34776|29616|27082887|
 * average state row size

||mode||size||note||
|disable|859.604889211| |
|enable|788.939591288|91.77 %|

 
{quote}Summary
{quote}
* Enabling option showed on far or slightly better throughputs from three cases. It showed
slightly lower throughput from one case but it was around 0.1% which I think we can treat
it as noise.

 
 * In overall it didn't show any performance regression at any cases.

 * Enabling option reduced state memory usage according to the ratio of key-value size as
expected. Enabling option could reduce state memory usage around 45% for bigger key case.


was (Author: kabhwan):
I had a chance to craft various key/value cases (bigger key, bigger value, many key columns,
many value columns) and ran similar tests on these cases. To handle 4 tests concurrently,
I just had 2 trials (instead of 3 trials) per each case & disable/enable.

Same spark version and same AWS dedicated instance, same command to run JVM (just increased
driver memory to 6g).

I'll describe test results per case.
{quote}App 1 - bigger key
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsKeyMuchBigger.scala]
 * key fields : window (start/end), mod (int), word (1000 chars string)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.019877228518|21173.52173524455|
|disable|2|50000.0|9999.980273266538|21241.022682965504|
|enable|1|50000.0|10000.00006336634|21745.986204268098|
|enable|2|50000.0|10000.000007920793|21382.004127689153|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2283.7425742574255|0.06930693069306931|null|62.475247524752476|2365.168316831683|10.267326732673267|
|disable|2|2274.3564356435645|0.09900990099009901|null|64.42574257425743|2358.059405940594|11.049504950495049|
|enable|1|2216.8514851485147|0.0891089108910891|null|65.23762376237623|2301.920792079208|10.752475247524753|
|enable|2|2260.366336633663|0.0297029702970297|null|63.742574257425744|2344.6633663366338|11.673267326732674|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34153|29274|68989767|
|disable|2|150|34153|29274|68981335|
|enable|1|150|38952|31352|44137447|
|enable|2|150|38969|31138|43849183|
 * average state row size

||mode||size||note||
|disable|2019.897256464| |
|enable|1129.178232806|55.90 %|
{quote}App 2 - bigger value
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsValueMuchBigger.scala]
 * key fields : window (start/end), mod (int)
 * value fields : max_value (int), min_value (int), avg_value (double), word_last (1000 chars
string)

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000039603963|20021.91393461469|
|disable|2|50000.0|10000.000071287132|19811.730674701703|
|enable|1|50000.0|10000.000435644328|20518.188917614298|
|enable|2|50000.0|10000.000071287132|20312.040082394597|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2417.990099009901|0.0297029702970297|null|64.9009900990099|2500.910891089109|10.158415841584159|
|disable|2|2447.7524752475247|0.039603960396039604|null|62.386138613861384|2529.3564356435645|10.712871287128714|
|enable|1|2357.891089108911|0.04950495049504951|null|62.81188118811881|2439.891089108911|10.881188118811881|
|enable|2|2381.5346534653463|0.04950495049504951|null|61.742574257425744|2463.108910891089|11.435643564356436|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|700|600|934447|
|disable|2|150|700|600|934223|
|enable|1|150|800|700|1029799|
|enable|2|150|800|700|1029183|
 * average state row size

||mode||size||note||
|disable|1334.764285714| |
|enable|1286.86375|96.41 %|
{quote}App 3 - many keys
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string), and 20 more columns
(int)
 * value fields : max_value (int), min_value (int), avg_value (double)

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|50000.0|10000.000055445547|17645.896698295066|
|disable|2|50000.0|10000.00008712872|17627.17064059849|
|enable|1|50000.0|10000.000039603963|18408.23205926807|
|enable|2|50000.0|10000.000071287132|18568.40828962387|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|2739.990099009901|0.0594059405940594|null|77.48514851485149|2837.4851485148515|11.584158415841584|
|disable|2|2742.960396039604|0.04950495049504951|null|76.03960396039604|2839.019801980198|11.009900990099009|
|enable|1|2625.108910891089|0.019801980198019802|null|78.06930693069307|2722.3366336633662|10.623762376237623|
|enable|2|2599.5544554455446|0.039603960396039604|null|76.24752475247524|2695.029702970297|10.643564356435643|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|38946|31487|23036727|
|disable|2|150|38992|30800|23084343|
|enable|1|150|38915|31883|15208727|
|enable|2|150|39007|30401|14829975|
 * average state row size

||mode||size||note||
|disable|591.765993004| |
|enable|385.503337366|65.14 %|
{quote}App 4 - many values
{quote}
> Code

[https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/blob/master/src/main/scala/com/hortonworks/spark/benchmark/BenchmarkMovingAggregationsListenerToFsManyKeys.scala]
 * key fields : window (start/end), mod (int), word (10 chars string)
 * value fields : max (int), min (int), avg (double) for 20 columns

> Test Result - use same queries from above test

>> query result (source)
||mode||trial||avg(numInputRows)||avg(inputRowsPerSecond)||avg(processedRowsPerSecond)||
|disable|1|63465.346534653465|10005.318225922994|9999.487035328104|
|disable|2|65247.52475247525|10011.503266213555|10001.570001460635|
|enable|1|62673.267326732675|9998.149926805827|9997.805403434037|
|enable|2|63564.35643564357|10000.005956852667|9996.36862736686|

>> query result (duration)
||mode||trial||avg(addBatch)||avg(getBatch)||avg(getOffset)||avg(queryPlanning)||avg(triggerExecution)||avg(walCommit)||
|disable|1|6235.069306930693|0.04950495049504951|null|93.63366336633663|6348.584158415842|9.910891089108912|
|disable|2|6410.3564356435645|0.039603960396039604|null|93.73267326732673|6523.821782178218|9.910891089108912|
|enable|1|6162.6732673267325|0.04950495049504951|null|88.62376237623762|6271.178217821782|10.435643564356436|
|enable|2|6244.079207920792|0.0297029702970297|null|94.26732673267327|6357.970297029703|9.930693069306932|

>> query result (state information)
||mode||trial||batchId||numRowsTotal||numRowsUpdated||memoryUsedBytes||
|disable|1|150|34776|29808|30019279|
|disable|2|150|39627|33928|33920375|
|enable|1|150|39842|30502|31837671|
|enable|2|150|34776|29616|27082887|
 * average state row size

||mode||size||note||
|disable|859.604889211| |
|enable|788.939591288|91.77 %|

 
{quote}Summary
{quote} * Enabling option showed on far or slightly better throughputs from three cases. It
showed slightly lower throughput from one case but it was around 0.1% which I think we can
treat it as noise.
 * In overall it didn't show any performance regression at any cases.

 * Enabling option reduced state memory usage according to the ratio of key-value size as
expected. Enabling option could reduce state memory usage around 45% for bigger key case.

> Remove redundant key data from value in streaming aggregation
> -------------------------------------------------------------
>
>                 Key: SPARK-24763
>                 URL: https://issues.apache.org/jira/browse/SPARK-24763
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Jungtaek Lim
>            Priority: Major
>
> Key/Value of state in streaming aggregation is formatted as below:
>  * key: UnsafeRow containing group-by fields
>  * value: UnsafeRow containing key fields and another fields for aggregation results
> which data for key is stored to both key and value.
> This is to avoid doing projection row to value while storing, and joining key and value
to restore origin row to boost performance, but while doing a simple benchmark test, I found
it not much helpful compared to "project and join". (will paste test result in comment)
> So I would propose a new option: remove redundant in stateful aggregation. I'm avoiding
to modify default behavior of stateful aggregation, because state value will not be compatible
between current and option enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message