spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andres Fernandez (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination
Date Wed, 01 May 2019 02:28:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Andres Fernandez updated SPARK-27613:
-------------------------------------
    Component/s:     (was: Spark Core)
                 PySpark

> Caching an RDD composed of Row Objects produces some kind of key recombination
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-27613
>                 URL: https://issues.apache.org/jira/browse/SPARK-27613
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Andres Fernandez
>            Priority: Major
>
> (Code included at the bottom)
> The function "+create_dataframes_from_azure_responses_rdd+" receives *table_names* (_list_
of _str_) and *responses_rdd* (rdd of tuples <_str_, _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>).
It will then go ahead and iterate over the table names to create dataframes filtering the
RDDs by the first element and valid response.
> So far so good.
> QueryResponse object (from azure.loganalytics package) contains, essentialy, a list with
1 "_table_" which in turn has a "_columns_" and a "_rows_" field. Every single response (fifth
element of the tuple [4]) for the same table name (first element of the tuple [0]) has exactly
the same columns in the same order (order is not important other thant to reference the rows
data inside the same response anyways). The types are stored in *column_types* taking the
first response as the sample.
> Now to the tricky part.
> I call flatMap on the *responses_rdd* with the function +"tabularize_response_rdd+"
which basically creates a Row object for every row (_list_ of _str_) in the _QueryResponse_.
I also create the schema based on a *type_map* from azure types to spark.sql.types in order
to specify it to the subsequent createDataFrame instruction. If the result of this flatMap, *table_tabular_rdd*,
is not cached before creating the DataFrame from the Rows RDD everything works smoothly. Nevertheless
if the result of the flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame
a mismatch is evidenced between the actual key:values for the Row objects.
> It would be good to point that when a Row Object is created from an unpacked dict the
code in [[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
sorts the keys; is this behaviour overriden somehow by caching?
> Let me please know what I am doing wrong, is there any best practice / documented solution
I am not following? Im just a beginner when it comes to Spark and would happily accept any
suggestion. I hope I was clear enough, and I am open to give you any additional details that
might be helpful. Thank you! (Code and error attached as well).
> The error looks like if it was related to casting, but it can be seen that the contents
do not correspond to the key. *record_count* key is actually a Long but in the Row it got
somehow swapped for another key's value, in this case 'n/a'.
> {code:java}
> def create_dataframes_from_azure_responses_rdd(table_names: list, responses_rdd: pyspark.rdd,
verbose:bool=False) -> list:
>   ws_column_name = "WorkspaceId"
>   def tabularize_response_rdd(x: tuple):
>     import pyspark
>     tn, wsid, count, interval, response = x
>     ret = []
>     if response.tables[0].rows:
>       ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi in enumerate(response.tables[0].columns)}})
for r in response.tables[0].rows]
>     return ret
>   data_frames = {}
>   for tn in table_names:
>     if verbose: print("Filtering RDD items for {}".format(tn))
>     table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and x[4]!=None).cache()
>     
>     data_frames[tn] = None
>     if verbose: print("Checking if RDD for {} has data".format(tn))
>     if not table_response_rdd.isEmpty():
>       if verbose: print("Getting column types for {} from azure response".format(tn))
>       column_types = {f.name:f.type for f in table_response_rdd.take(1)[0][4].tables[0].columns}
>       column_types[ws_column_name] = "string"
>       if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
>       table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) #.cache()
#Error with cache, no error without!
>       if verbose: print("Getting sample row for {}".format(tn))
>       row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
>       if verbose: print("Building schema for {} from sample row and column types".format(tn))
>       current_schema = StructType([StructField(f, type_map[column_types[f]](), True)
for f in row_fields])
>       if verbose: print("Creating dataframe for {}".format(tn))
>       table_df = spark.createDataFrame(table_tabular_rdd, schema=current_schema).cache()
>       if verbose: print("Calculating expected count for {}".format(tn))
>       expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda
x: x[1]).sum()
>       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]
>       table_response_rdd.unpersist()
>       #table_tabular_rdd.unpersist()
>       if verbose: print("Expected count {} vs Real count {}".format(expected_count, real_count))
>       data_frames[tn]=table_df
>     else:
>       if verbose: print("{} table was empty!".format(tn))
>   return data_frames
> {code}
> {noformat}
> Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in <module>()
      1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, sta, container, sta_key,
tid, creds, 5000, True)       2 resrdds.cache() ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns,
resrdds, True)       4 resrdds.unpersist() <command-2824384765475774> in create_dataframes_from_azure_responses_rdd(table_names,
responses_rdd, verbose)      37 if verbose: print("Calculating expected count for {}".format(tn))
     38 expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda
x: x[1]).sum() ---> 39       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]
     40 table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() /databricks/spark/python/pyspark/sql/dataframe.py
in collect(self)     546 # Default path used in OSS Spark / for non-DF-ACL clusters:     547
with SCCallSiteSync(self._sc) as css: --> 548             sock_info = self._jdf.collectToPython()
    549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) 
   550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self,
*args)    1255 answer = self.gateway_client.send_command(command)    1256 return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
>    1258    1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py
in deco(*a, **kw)      61 def deco(*a, **kw):      62 try: ---> 63             return f(*a,
**kw)      64 except py4j.protocol.Py4JJavaError as e:      65 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer,
gateway_client, target_id, name)     326 raise Py4JJavaError(     327 "An error occurred while
calling {0}{1}{2}.\n". --> 328                     format(target_id, ".", name), value)
>     329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while calling
o700.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure:
Task 58 in stage 141.0 failed 4 times, most recent failure: Lost task 58.3 in stage 141.0
(TID 76193, 10.139.64.12, executor 0): org.apache.spark.api.python.PythonException: Traceback
(most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main
process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index,
iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream
vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py",
line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py",
line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py",
line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count:
LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289)
at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException:
Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line
403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process
serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py",
line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py",
line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py",
line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py",
line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py",
line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count:
LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message