systemml-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthias Boehm <mboe...@googlemail.com>
Subject Re: [HELP] Undesired Benchmark Results
Date Fri, 24 Mar 2017 18:33:01 GMT
1) It's hard to debug these cases remotely, but errors like "Size 
exceeds Integer.MAX_VALUE" usually come from too large partitions that 
exceed Spark's 2GB limitation. The logs would be helpful here, 
especially the number of partitions of the second (i.e., post-shuffle) 
stage of csvrblk.

2) If you're interested in the caching logic, you can look at the 
following code for Spark and CP, respectively. First, for Spark, we 
inject caching directives (CheckpointSPInstruction) for out-of-core 
matrices after any persistent read, reblock, or for variables used 
read-only in loops, unless they are used in simple update chains. You'll 
see the status of cached RDDs in the Web UI. Second, for CP, the buffer 
pool (see CacheableData and LazyWriteBuffer) handles caching: during 
operations matrices are pinned via strong references, while in unpinned 
state they are only softly reachable which means that the garbage 
collector is free to drop them under memory pressure, in which case we 
would have to restore them into memory. You can verify that no eviction 
happened by looking at the cache statistics, specifically cache hits and 
writes to FS, i.e., our cache directory on local file system.


Regards,
Matthias

On 3/24/2017 10:10 AM, Mingyang Wang wrote:
> Hi Matthias,
>
> Thanks for your thorough explanations and suggestions! But I observed some
> strange behaviors when reducing the driver size.
>
> Say, I simply reduced the driver size to 10GB with anything else unchanged,
> it worked fine for 20GB case, but errors occurred for 10GB case. Similar
> behaviors have been observed after I further reduced the driver to 5GB.
> Some error traces for driver size 10GB are attached below:
>
> 17/03/24 09:39:35 INFO ShuffleBlockFetcherIterator: Getting 313 non-empty
> blocks out of 3
> 13 blocks
> 17/03/24 09:39:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 4 ms
> 17/03/24 09:39:38 INFO BlockManagerInfo: Removed broadcast_4_piece0 on
> 198.202.119.114:37
> 680 in memory (size: 3.2 KB, free: 5.2 GB)
> 17/03/24 09:39:41 INFO UnifiedMemoryManager: Will not store rdd_8_32 as the
> required spac
> e (1048576 bytes) exceeds our memory limit (241164 bytes)
> 17/03/24 09:39:41 WARN MemoryStore: Failed to reserve initial memory
> threshold of 1024.0
> KB for computing block rdd_8_32 in memory.
> 17/03/24 09:39:41 WARN MemoryStore: Not enough space to cache rdd_8_32 in
> memory! (comput
> ed 384.0 B so far)
> 17/03/24 09:39:41 INFO MemoryStore: Memory use = 235.5 KB (blocks) + 0.0 B
> (scratch space
>  shared across 0 tasks(s)) = 235.5 KB. Storage limit = 235.5 KB.
> 17/03/24 09:39:41 WARN BlockManager: Persisting block rdd_8_32 to disk
> instead.
> 17/03/24 09:39:47 INFO BlockManagerInfo: Added rdd_8_32 on disk on
> 198.202.119.114:37680
> (size: 3.9 GB)
> 17/03/24 09:39:47 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
> 939)
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)
>         at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>         at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>         at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>         at
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:462)
>         at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:698)
>         at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 17/03/24 09:39:47 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 939,
> localhost, executor driver): java.lang.IllegalArgumentException: Size
> exceeds Integer.MAX_VALUE
>         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)
>           at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>         at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>         at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>         at
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:462)
>         at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:698)
>         at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)...
>
> If needed, I can attach the whole log.
>
> Also, I want to verify that after the instruction sp_csvrblk completes, the
> data is loaded into memory, either in control program or Spark executors,
> right? I have tried to verify this in the source code, but it is not that
> straightforward.
>
>
> Regards,
> Mingyang
>
> On Thu, Mar 23, 2017 at 11:36 PM Matthias Boehm <mboehm7@googlemail.com>
> wrote:
>
> well, after thinking some more about this issue, I have to correct myself
> but the workarounds still apply. The problem is not the "in-memory reblock"
> but the collect of the reblocked RDD, which is similarly handed over to the
> bufferpool and eventually evicted accordingly.
>
> For a subset of operations such as aggregates and matrix multiplication, we
> actually select the execution type transitively, i.e., if the input comes
> from spark, the operation is cheap, and reduces the data size, we
> transitively select a distributed operation even though the operation would
> fit in memory. I would classify the missing transitive operator selection
> for right indexing operations as a bug - we'll fix this in our upcoming
> 0.14 release. Thanks for catching it.
>
> Regards,
> Matthias
>
> On Thu, Mar 23, 2017 at 9:55 PM, Matthias Boehm <mboehm7@googlemail.com>
> wrote:
>
>> thanks for the feedback Mingyang. Let me quickly explain what happens here
>> and subsequently give you a couple of workarounds.
>>
>> 1. Understanding the Bottleneck: For any text inputs, we will initially
>> compile distributed reblock operations that convert the text
> representation
>> into RDDs of matrix indexes and matrix blocks. At runtime, however,
>> SystemML decides to perform these conversions via a simple multi-threaded
>> read into memory if the data comfortably fits in the memory budget of the
>> driver (70% of max heap, i.e., 14GB in your case) and certain other
>> constraints are met. After the read, the in-memory matrix is handed over
> to
>> the buffer pool (15% of max heap), and if it doesn't fit, it's written to
> a
>> local cache directory, which is likely the reason for the performance
> issue
>> you encountered. You can verify this by looking at the "-stats" output,
>> which includes the cache statistics,
>>
>> Cache hits (Mem, WB, FS, HDFS):    1717/0/0/2.
>> Cache writes (WB, FS, HDFS):    573/0/1.
>> Cache times (ACQr/m, RLS, EXP):    9.799/0.005/0.106/0.250 sec
>>
>> where ACQ stands for 'acquire read' which includes RDD collects and local
>> reads, and RLS stands for 'release' which includes evictions to local
> disk.
>>
>> 2. Workarounds:
>> a) Binary data: In case of binary input data, you would likely not
>> encounter this issue, because we would not evict read binary matrices
> since
>> they can be recovered from HDFS at any time. You could explicitly convert
>> the input data to its binary representation via a simple read - write
>> script (with format="binary"). From a benchmark perspective, this would be
>> the preferable option anyway, unless to want to benchmark the data
>> ingestion part form external formats.
>>
>> b) Smaller Memory budget: Since all decisions on local vs distributed
>> operations respect available memory constraints, you can influence these
>> decisions by (1) a smaller driver memory to force more operations to
>> distribute execution, or (2) a larger driver memory, to make the buffer
>> pool large enough to avoid evictions.
>>
>> c) Forced execution type: You can also force all operations - independent
>> of the driver memory budget - to distributed operations by putting "-exec
>> spark" into your command line invocation. From a benchmark perspective, I
>> would however, not recommend this.
>>
>> Thanks again for the feedback. While writing this comment, I actually came
>> to the conclusion that we could handle even the case with input csv better
>> in order to avoid evictions in these scenarios.
>>
>>
>> Regards,
>> Matthias
>>
>>
>>
>> On Thu, Mar 23, 2017 at 8:41 PM, Mingyang Wang <miw092@eng.ucsd.edu>
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I got some some undesired results when running some benchmarks with
>>> SystemML, and I really hope to get some helps from you guys.
>>>
>>> Say, I want to read a large matrix in csv and retrieve the first cell
> with
>>> script
>>>
>>> # simple_read.dml
>>> data = read($RS)
>>> print("One Cell data[1,1]: " + toString(data[1,1]))
>>>
>>> and run it with the configuration such as
>>>
>>> spark-submit \
>>>     --driver-memory 20G \
>>>     --executor-memory 100G \
>>>     --executor-cores 20 \
>>>     --num-executors 1 \
>>>     --conf spark.driver.maxResultSize=0 \
>>>     --conf spark.rpc.message.maxSize=128 \
>>>     $SYSTEMML_HOME/target/SystemML.jar \
>>>     -f simple_read.dml \
>>>     -stats \
>>>     -explain \
>>>     -nvargs
>>> RS=/oasis/scratch/comet/hyoung/temp_project/scatter_data/
>>> TR20_FR4.00/RS_join.csv
>>>
>>> When the input matrix is 2x10^7 by 103 (around 20GB), things worked fine,
>>> as sp_rangeReIndex took 19.725s and sp_csvrblk took 11.330s. But when the
>>> input matrix is reduced to 10^7 by 103 (around 10GB), interesting things
>>> happened, as rangeReIndex took 92.024s and sp_csvrblk took 7.774s.
>>>
>>> These results were obtained with SystemML v0.13.0 on Spark 2.1.0 in
>>> standalone mode with 128GB memory and 24 cores.
>>>
>>> From the log, it seems that for the latter case, the control program in
>>> the
>>> driver took the main job and led to lots of disk I/O, thus the whole
>>> program had been slowed down.
>>>
>>> I understand that assigning the control program some tasks is a key
>>> component in SystemML. But this feature really brings in some chaos to
> the
>>> benchmarks in my case. Any suggestion about how to choose a better
>>> configuration or make some detours so I can obtain fair benchmarks on a
>>> wide range of data dimensions?
>>>
>>> If needed, I can attach the logs.
>>>
>>> I really appreciate your help!
>>>
>>>
>>> Regards,
>>> Mingyang Wang
>>> Graduate Student in UCSD CSE Dept.
>>>
>>
>>
>

Mime
View raw message