systemml-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthias Boehm <mboe...@gmail.com>
Subject Re: Passing a CoordinateMatrix to SystemML
Date Sun, 24 Dec 2017 11:14:01 GMT
Thanks again for catching this issue Anthony - this IJV reblock issue 
with large ultra-sparse matrices is now fixed in master. It likely did 
not show up on the 1% sample because the data was small enough to read 
it directly into the driver.

However, the dataFrameToBinaryBlock might be another issue that I could 
not reproduce yet, so it would be very helpful if you could give it 
another try. Thanks.

Regards,
Matthias

On 12/24/2017 9:57 AM, Matthias Boehm wrote:
> Hi Anthony,
>
> thanks for helping to debug this issue. There are no limits other than
> the dimensions and number of non-zeros being of type long. It sounds
> more like an issues of converting special cases of ultra-sparse
> matrices. I'll try to reproduce this issue and give an update as soon as
> I know more. In the meantime, could you please (a) also provide the
> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b)
> try calling your IJV conversion script via spark submit to exclude that
> this issue is API-related? Thanks.
>
> Regards,
> Matthias
>
> On 12/24/2017 1:40 AM, Anthony Thomas wrote:
>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
>> dimensions and blocksizes to dataFrameToBinaryBlock both without
>> success. I
>> additionally wrote out the matrix to hdfs in IJV format and am still
>> getting the same error when calling "read()" directly in the DML.
>> However,
>> I created a 1% sample of the original data in IJV format and SystemML was
>> able to read the smaller file without any issue. This would seem to
>> suggest
>> that either there is some corruption in the full file or I'm running into
>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
>> 2.4e9 nonzero values, but this seems like it should be well within the
>> limits of what SystemML/Spark can handle. I also checked for obvious data
>> errors (file is not 1 indexed or contains blank lines). In case it's
>> helpful, the stacktrace from reading the data from hdfs in IJV format is
>> attached. Thanks again for your help - I really appreciate it.
>>
>>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>>         at java.lang.System.arraycopy(Native Method)
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRightByN(SparseBlockCOO.java:594)
>>
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(SparseBlockCOO.java:323)
>>
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1790)
>>
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1736)
>>
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>
>>         at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
>>
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
>>
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
>>
>>         at
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
>>
>>         at
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>>
>>         at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
>>
>>         at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>>
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>>
>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> Anthony
>>
>>
>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mboehm7@gmail.com>
>> wrote:
>>
>>> Given the line numbers from the stacktrace, it seems that you use a
>>> rather
>>> old version of SystemML. Hence, I would recommend to upgrade to SystemML
>>> 1.0 or at least 0.15 first.
>>>
>>> If the error persists or you're not able to upgrade, please try to call
>>> dataFrameToBinaryBlock with provided matrix characteristics of
>>> dimensions
>>> and blocksizes. The issue you've shown usually originates from incorrect
>>> meta data (e.g., negative number of columns or block sizes), which
>>> prevents
>>> the sparse rows from growing to the necessary sizes.
>>>
>>> Regards,
>>> Matthias
>>>
>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Thanks for the help! In response to your questions:
>>>>
>>>>    1. Sorry - this was a typo: the correct schema is: [y: int,
>>>> features:
>>>>    vector] - the column "features" was created using Spark's
>>>> VectorAssembler
>>>>    and the underlying type is an
>>>> org.apache.spark.ml.linalg.SparseVector.
>>>>    Calling x.schema results in: org.apache.spark.sql.types.StructType =
>>>>    StructType(StructField(features,org.apache.spark.ml.
>>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>>    2. "y" converts fine - it appears the only issue is with X. The
>>>> script
>>>>    still crashes when running "print(sum(X))". The full stack trace is
>>>>    attached at the end of the message.
>>>>    3. Unfortunately, the error persists when calling
>>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>>    4. Also just in case this matters: I'm packaging the script into
>>>> a jar
>>>>
>>>>    using SBT assembly and submitting via spark-submit.
>>>>
>>>> Here's an updated script:
>>>>
>>>>         val input_df = spark.read.parquet(inputPath)
>>>>         val x = input_df.select(featureNames)
>>>>         val y = input_df.select("y")
>>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>>
>>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>>         println("Reading X")
>>>>         val res_x = ml.execute(script_x)
>>>>
>>>> Here is the output of the runtime plan generated by SystemML:
>>>>
>>>> # EXPLAIN (RUNTIME):
>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>>> PROGRAM ( size CP/SP = 3/0 )
>>>> --MAIN PROGRAM
>>>> ----GENERIC (lines 1-2) [recompile=false]
>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>>> ------CP rmvar _Var0 _Var1
>>>>
>>>> And the resulting stack trace:
>>>>
>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0
>>>> (TID 205,
>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>>> SparseRow.java:215)
>>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>>> append(SparseBlockMCSR.java:253)
>>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>>> appendValue(MatrixBlock.java:663)
>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>> erUtils$
>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>> erUtils$
>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>> apply(JavaRDDLike.scala:186)
>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>> apply(JavaRDDLike.scala:186)
>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>>> MapPartitionsRDD.scala:38)
>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>> ShuffleMapTask.scala:96)
>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>> ShuffleMapTask.scala:53)
>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>> scala:335)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>>> times; aborting job
>>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>>> .MLContextException:
>>>> Exception when executing script
>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>> java:311)
>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>> java:280)
>>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>>> ml_algorithms.scala:63)
>>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>> NativeMethodAccessorImpl.java:62)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>> DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>>> SparkSubmit.scala:180)
>>>>     at
>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
>>>> occurred while executing runtime program
>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>> Program(
>>>> ScriptExecutor.java:390)
>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>>> execute(ScriptExecutor.java:298)
>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>> java:303)
>>>>     ... 14 more
>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
>>>> program block generated from statement block between lines 1 and 2 --
>>>> Error
>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>>> execute(Program.java:130)
>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>> Program(
>>>> ScriptExecutor.java:388)
>>>>     ... 16 more
>>>> ...
>>>>
>>>>
>>>>
>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mboehm7@gmail.com>
>>>> wrote:
>>>>
>>>> well, let's do the following to figure this out:
>>>>>
>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>>> please change the third line to val y = input_data.select("label").
>>>>>
>>>>> 2) For debugging, I would recommend to use a simple script like
>>>>> "print(sum(X));" and try converting X and y separately to isolate the
>>>>> problem.
>>>>>
>>>>> 3) If it's still failing, it would be helpful to known (a) if it's an
>>>>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>>>>
>>>>> 4) As a workaround you might also call our internal converter directly
>>>>> via:
>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>>> isVector),
>>>>> where jsc is the java spark context, df is the dataset, mc are matrix
>>>>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>>>>> containsID indicates if the dataset contains a column "__INDEX"
>>>>> with the
>>>>> row indexes, and isVector indicates if the passed datasets contains
>>>>> vectors
>>>>> or basic types such as double.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Matthias
>>>>>
>>>>>
>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>>
>>>>> Hi SystemML folks,
>>>>>>
>>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>>> MLContext
>>>>>> API. The data is derived from a parquet file containing a
>>>>>> dataframe with
>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing
the
>>>>>> following:
>>>>>>
>>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>>         val x = input_data.select("features")
>>>>>>         val y = input_data.select("y")
>>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>>                 in("X", x, x_meta).
>>>>>>                 in("Y", y, y_meta)
>>>>>>         ...
>>>>>>
>>>>>> However, this results in an error from SystemML:
>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>> I'm guessing this has something to do with SparkML being zero indexed
>>>>>> and
>>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>>> differently
>>>>>> here? Note that I also tried converting the dataframe to a
>>>>>> CoordinateMatrix
>>>>>> and then creating an RDD[String] in IJV format. That too resulted
in
>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>>> simple
>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>>> what.
>>>>>> Please let me know if you need more information (I can send along
the
>>>>>> full
>>>>>> error stacktrace if that would be helpful)!
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Anthony
>>>>>>
>>>>>>
>>>>>>
>>>>
>>

Mime
View raw message