spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andres Fernandez (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination
Date Wed, 01 May 2019 05:26:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andres Fernandez updated SPARK-27613:
-------------------------------------
    Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will then go ahead and iterate over the table names to create dataframes filtering the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. Every single response (fifth element of the tuple [4]) for the same table name (first element of the tuple [0]) has exactly the same columns in the same order (order is not important other thant to reference the rows data inside the same response anyways). The types are stored in *column_types* taking the first response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function +"tabularize_response_rdd+" which basically creates a Row object for every row (_list_ of _str_) in the _QueryResponse_. A schema is created based on a *type_map* from azure types to spark.sql.types in order to specify it to the subsequent createDataFrame instruction. If the result of this flatMap, *table_tabular_rdd*, is **not** cached before creating the DataFrame from the Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked dict the code in [here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375] sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / documented solution I am not following? Im just a beginner when it comes to Spark and would happily accept any suggestion. I hope I was clear enough, and I am open to give you any additional details that might be helpful. Thank you! (Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the contents do not correspond to the key. *record_count* key is actually a Long but in the Row it got somehow swapped for another key's value, in this case 'n/a'.
{code:python}
def create_dataframes_from_azure_responses_rdd(table_names: list, responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
    import pyspark
    tn, wsid, count, interval, response = x
    ret = []
    if response.tables[0].rows:
      ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
    return ret
  data_frames = {}
  for tn in table_names:
    if verbose: print("Filtering RDD items for {}".format(tn))
    table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and not x[4] is None).cache()
    
    data_frames[tn] = None
    if verbose: print("Checking if RDD for {} has data".format(tn))
    if not table_response_rdd.isEmpty():
      if verbose: print("Getting column types for {} from azure response".format(tn))
      column_types = {f.name:f.type for f in table_response_rdd.take(1)[0][4].tables[0].columns}
      column_types[ws_column_name] = "string"
      if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
      table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) #.cache() #Error with cache, no error without!
      if verbose: print("Getting sample row for {}".format(tn))
      row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
      if verbose: print("Building schema for {} from sample row and column types".format(tn))
      current_schema = StructType([StructField(f, type_map[column_types[f]](), True) for f in row_fields])
      if verbose: print("Creating dataframe for {}".format(tn))
      table_df = spark.createDataFrame(table_tabular_rdd, schema=current_schema).cache()
      if verbose: print("Calculating expected count for {}".format(tn))
      expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum()
      real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]
      table_response_rdd.unpersist()
      #table_tabular_rdd.unpersist()
      if verbose: print("Expected count {} vs Real count {}".format(expected_count, real_count))
      data_frames[tn]=table_df
    else:
      if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}
{noformat}
Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in <module>()       1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, sta, container, sta_key, tid, creds, 5000, True)       2 resrdds.cache() ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, True)       4 resrdds.unpersist() <command-2824384765475774> in create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, verbose)      37 if verbose: print("Calculating expected count for {}".format(tn))      38 expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]      40 table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() /databricks/spark/python/pyspark/sql/dataframe.py in collect(self)     546 # Default path used in OSS Spark / for non-DF-ACL clusters:     547 with SCCallSiteSync(self._sc) as css: --> 548             sock_info = self._jdf.collectToPython()     549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))     550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)    1255 answer = self.gateway_client.send_command(command)    1256 return_value = get_return_value( -> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258    1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)      61 def deco(*a, **kw):      62 try: ---> 63             return f(*a, **kw)      64 except py4j.protocol.Py4JJavaError as e:      65 s = e.java_exception.toString() /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)     326 raise Py4JJavaError(     327 "An error occurred while calling {0}{1}{2}.\n". --> 328                     format(target_id, ".", name), value)
    329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
{noformat}

  was:
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will then go ahead and iterate over the table names to create dataframes filtering the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. Every single response (fifth element of the tuple [4]) for the same table name (first element of the tuple [0]) has exactly the same columns in the same order (order is not important other thant to reference the rows data inside the same response anyways). The types are stored in *column_types* taking the first response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function +"tabularize_response_rdd+" which basically creates a Row object for every row (_list_ of _str_) in the _QueryResponse_. A schema is created based on a *type_map* from azure types to spark.sql.types in order to specify it to the subsequent createDataFrame instruction. If the result of this flatMap, *table_tabular_rdd*, is **not** cached before creating the DataFrame from the Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked dict the code in [here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375] sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / documented solution I am not following? Im just a beginner when it comes to Spark and would happily accept any suggestion. I hope I was clear enough, and I am open to give you any additional details that might be helpful. Thank you! (Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the contents do not correspond to the key. *record_count* key is actually a Long but in the Row it got somehow swapped for another key's value, in this case 'n/a'.
{code:python}
def create_dataframes_from_azure_responses_rdd(table_names: list, responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
    import pyspark
    tn, wsid, count, interval, response = x
    ret = []
    if response.tables[0].rows:
      ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
    return ret
  data_frames = {}
  for tn in table_names:
    if verbose: print("Filtering RDD items for {}".format(tn))
    table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and x[4]!=None).cache()
    
    data_frames[tn] = None
    if verbose: print("Checking if RDD for {} has data".format(tn))
    if not table_response_rdd.isEmpty():
      if verbose: print("Getting column types for {} from azure response".format(tn))
      column_types = {f.name:f.type for f in table_response_rdd.take(1)[0][4].tables[0].columns}
      column_types[ws_column_name] = "string"
      if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
      table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) #.cache() #Error with cache, no error without!
      if verbose: print("Getting sample row for {}".format(tn))
      row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
      if verbose: print("Building schema for {} from sample row and column types".format(tn))
      current_schema = StructType([StructField(f, type_map[column_types[f]](), True) for f in row_fields])
      if verbose: print("Creating dataframe for {}".format(tn))
      table_df = spark.createDataFrame(table_tabular_rdd, schema=current_schema).cache()
      if verbose: print("Calculating expected count for {}".format(tn))
      expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum()
      real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]
      table_response_rdd.unpersist()
      #table_tabular_rdd.unpersist()
      if verbose: print("Expected count {} vs Real count {}".format(expected_count, real_count))
      data_frames[tn]=table_df
    else:
      if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}
{noformat}
Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in <module>()       1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, sta, container, sta_key, tid, creds, 5000, True)       2 resrdds.cache() ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, True)       4 resrdds.unpersist() <command-2824384765475774> in create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, verbose)      37 if verbose: print("Calculating expected count for {}".format(tn))      38 expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]      40 table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() /databricks/spark/python/pyspark/sql/dataframe.py in collect(self)     546 # Default path used in OSS Spark / for non-DF-ACL clusters:     547 with SCCallSiteSync(self._sc) as css: --> 548             sock_info = self._jdf.collectToPython()     549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))     550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)    1255 answer = self.gateway_client.send_command(command)    1256 return_value = get_return_value( -> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258    1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)      61 def deco(*a, **kw):      62 try: ---> 63             return f(*a, **kw)      64 except py4j.protocol.Py4JJavaError as e:      65 s = e.java_exception.toString() /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)     326 raise Py4JJavaError(     327 "An error occurred while calling {0}{1}{2}.\n". --> 328                     format(target_id, ".", name), value)
    329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
{noformat}


> Caching an RDD composed of Row Objects produces some kind of key recombination
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-27613
>                 URL: https://issues.apache.org/jira/browse/SPARK-27613
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Andres Fernandez
>            Priority: Major
>
> (Code included at the bottom)
> The function "+create_dataframes_from_azure_responses_rdd+" receives *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will then go ahead and iterate over the table names to create dataframes filtering the RDDs by the first element and valid response.
> So far so good.
> _QueryResponse_ object (from azure.loganalytics package) contains, essentialy, a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. Every single response (fifth element of the tuple [4]) for the same table name (first element of the tuple [0]) has exactly the same columns in the same order (order is not important other thant to reference the rows data inside the same response anyways). The types are stored in *column_types* taking the first response as the sample.
> Now to the tricky part.
> flatMap is called on the *responses_rdd* with the function +"tabularize_response_rdd+" which basically creates a Row object for every row (_list_ of _str_) in the _QueryResponse_. A schema is created based on a *type_map* from azure types to spark.sql.types in order to specify it to the subsequent createDataFrame instruction. If the result of this flatMap, *table_tabular_rdd*, is **not** cached before creating the DataFrame from the Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is evidenced between the actual key:values for the Row objects.
> It would be good to point that when a Row Object is created from an unpacked dict the code in [here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375] sorts the keys; is this behaviour overriden somehow by caching?
> Let me please know what I am doing wrong, is there any best practice / documented solution I am not following? Im just a beginner when it comes to Spark and would happily accept any suggestion. I hope I was clear enough, and I am open to give you any additional details that might be helpful. Thank you! (Code and error attached as well).
> The error looks like if it was related to casting, but it can be seen that the contents do not correspond to the key. *record_count* key is actually a Long but in the Row it got somehow swapped for another key's value, in this case 'n/a'.
> {code:python}
> def create_dataframes_from_azure_responses_rdd(table_names: list, responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
>   ws_column_name = "WorkspaceId"
>   def tabularize_response_rdd(x: tuple):
>     import pyspark
>     tn, wsid, count, interval, response = x
>     ret = []
>     if response.tables[0].rows:
>       ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
>     return ret
>   data_frames = {}
>   for tn in table_names:
>     if verbose: print("Filtering RDD items for {}".format(tn))
>     table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and not x[4] is None).cache()
>     
>     data_frames[tn] = None
>     if verbose: print("Checking if RDD for {} has data".format(tn))
>     if not table_response_rdd.isEmpty():
>       if verbose: print("Getting column types for {} from azure response".format(tn))
>       column_types = {f.name:f.type for f in table_response_rdd.take(1)[0][4].tables[0].columns}
>       column_types[ws_column_name] = "string"
>       if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
>       table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) #.cache() #Error with cache, no error without!
>       if verbose: print("Getting sample row for {}".format(tn))
>       row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
>       if verbose: print("Building schema for {} from sample row and column types".format(tn))
>       current_schema = StructType([StructField(f, type_map[column_types[f]](), True) for f in row_fields])
>       if verbose: print("Creating dataframe for {}".format(tn))
>       table_df = spark.createDataFrame(table_tabular_rdd, schema=current_schema).cache()
>       if verbose: print("Calculating expected count for {}".format(tn))
>       expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum()
>       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]
>       table_response_rdd.unpersist()
>       #table_tabular_rdd.unpersist()
>       if verbose: print("Expected count {} vs Real count {}".format(expected_count, real_count))
>       data_frames[tn]=table_df
>     else:
>       if verbose: print("{} table was empty!".format(tn))
>   return data_frames
> {code}
> {noformat}
> Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in <module>()       1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, sta, container, sta_key, tid, creds, 5000, True)       2 resrdds.cache() ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, True)       4 resrdds.unpersist() <command-2824384765475774> in create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, verbose)      37 if verbose: print("Calculating expected count for {}".format(tn))      38 expected_count = table_response_rdd.map(lambda x: (x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39       real_count = table_df.select("record_count").groupBy().sum().collect()[0][0]      40 table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() /databricks/spark/python/pyspark/sql/dataframe.py in collect(self)     546 # Default path used in OSS Spark / for non-DF-ACL clusters:     547 with SCCallSiteSync(self._sc) as css: --> 548             sock_info = self._jdf.collectToPython()     549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))     550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)    1255 answer = self.gateway_client.send_command(command)    1256 return_value = get_return_value( -> 1257             answer, self.gateway_client, self.target_id, self.name)
>    1258    1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)      61 def deco(*a, **kw):      62 try: ---> 63             return f(*a, **kw)      64 except py4j.protocol.Py4JJavaError as e:      65 s = e.java_exception.toString() /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)     326 raise Py4JJavaError(     327 "An error occurred while calling {0}{1}{2}.\n". --> 328                     format(target_id, ".", name), value)
>     329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() File "/databricks/spark/python/pyspark/worker.py", line 398, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1383, in verify_default verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field record_count: LongType can not accept object 'n/a' in type <class 'str'> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message