systemml-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthias Boehm <mboe...@gmail.com>
Subject Re: Passing a CoordinateMatrix to SystemML
Date Sat, 23 Dec 2017 12:27:10 GMT
Given the line numbers from the stacktrace, it seems that you use a 
rather old version of SystemML. Hence, I would recommend to upgrade to 
SystemML 1.0 or at least 0.15 first.

If the error persists or you're not able to upgrade, please try to call 
dataFrameToBinaryBlock with provided matrix characteristics of 
dimensions and blocksizes. The issue you've shown usually originates 
from incorrect meta data (e.g., negative number of columns or block 
sizes), which prevents the sparse rows from growing to the necessary sizes.

Regards,
Matthias

On 12/22/2017 10:42 PM, Anthony Thomas wrote:
> Hi Matthias,
>
> Thanks for the help! In response to your questions:
>
>    1. Sorry - this was a typo: the correct schema is: [y: int, features:
>    vector] - the column "features" was created using Spark's VectorAssembler
>    and the underlying type is an org.apache.spark.ml.linalg.SparseVector.
>    Calling x.schema results in: org.apache.spark.sql.types.StructType =
>    StructType(StructField(features,org.apache.spark.ml.
>    linalg.VectorUDT@3bfc3ba7,true)
>    2. "y" converts fine - it appears the only issue is with X. The script
>    still crashes when running "print(sum(X))". The full stack trace is
>    attached at the end of the message.
>    3. Unfortunately, the error persists when calling
>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>    4. Also just in case this matters: I'm packaging the script into a jar
>    using SBT assembly and submitting via spark-submit.
>
> Here's an updated script:
>
>         val input_df = spark.read.parquet(inputPath)
>         val x = input_df.select(featureNames)
>         val y = input_df.select("y")
>         val meta_x = new MatrixMetadata(DF_VECTOR)
>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>
>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>         println("Reading X")
>         val res_x = ml.execute(script_x)
>
> Here is the output of the runtime plan generated by SystemML:
>
> # EXPLAIN (RUNTIME):
> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
> # Degree of Parallelism (vcores) local/remote = 24/?
> PROGRAM ( size CP/SP = 3/0 )
> --MAIN PROGRAM
> ----GENERIC (lines 1-2) [recompile=false]
> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
> ------CP rmvar _Var0 _Var1
>
> And the resulting stack trace:
>
> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 (TID 205,
> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
> SparseRow.java:215)
>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
> append(SparseBlockMCSR.java:253)
>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
> appendValue(MatrixBlock.java:663)
>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$
> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$
> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:797)
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:797)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>
> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
> times; aborting job
> Exception in thread "main" org.apache.sysml.api.mlcontext.MLContextException:
> Exception when executing script
>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:311)
>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:280)
>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
> ml_algorithms.scala:63)
>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
> SparkSubmit.scala:180)
>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
> occurred while executing runtime program
>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(
> ScriptExecutor.java:390)
>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
> execute(ScriptExecutor.java:298)
>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:303)
>     ... 14 more
> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
> program block generated from statement block between lines 1 and 2 -- Error
> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>     at org.apache.sysml.runtime.controlprogram.Program.
> execute(Program.java:130)
>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(
> ScriptExecutor.java:388)
>     ... 16 more
> ...
>
>
>
> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mboehm7@gmail.com> wrote:
>
>> well, let's do the following to figure this out:
>>
>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>> please change the third line to val y = input_data.select("label").
>>
>> 2) For debugging, I would recommend to use a simple script like
>> "print(sum(X));" and try converting X and y separately to isolate the
>> problem.
>>
>> 3) If it's still failing, it would be helpful to known (a) if it's an
>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>
>> 4) As a workaround you might also call our internal converter directly via:
>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>> isVector),
>> where jsc is the java spark context, df is the dataset, mc are matrix
>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>> containsID indicates if the dataset contains a column "__INDEX" with the
>> row indexes, and isVector indicates if the passed datasets contains vectors
>> or basic types such as double.
>>
>>
>> Regards,
>> Matthias
>>
>>
>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>
>>> Hi SystemML folks,
>>>
>>> I'm trying to pass some data from Spark to a DML script via the MLContext
>>> API. The data is derived from a parquet file containing a dataframe with
>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>> following:
>>>
>>>         val input_data = spark.read.parquet(inputPath)
>>>         val x = input_data.select("features")
>>>         val y = input_data.select("y")
>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>                 in("X", x, x_meta).
>>>                 in("Y", y, y_meta)
>>>         ...
>>>
>>> However, this results in an error from SystemML:
>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>> I'm guessing this has something to do with SparkML being zero indexed and
>>> SystemML being 1 indexed. Is there something I should be doing differently
>>> here? Note that I also tried converting the dataframe to a
>>> CoordinateMatrix
>>> and then creating an RDD[String] in IJV format. That too resulted in
>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something simple
>>> I'm doing wrong here, but I haven't been able to figure out exactly what.
>>> Please let me know if you need more information (I can send along the full
>>> error stacktrace if that would be helpful)!
>>>
>>> Thanks,
>>>
>>> Anthony
>>>
>>>
>

Mime
View raw message