spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-16409) regexp_extract with optional groups causes NPE
Date Fri, 05 Aug 2016 05:24:20 GMT

    [ https://issues.apache.org/jira/browse/SPARK-16409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408894#comment-15408894
] 

Sean Owen commented on SPARK-16409:
-----------------------------------

It's not quite my area, but I might know the answer. As to occurring "randomly", I suspect
you mean that your example causes it immediately but in a large program, where an action isn't
invoked until much later, it would execute and manifest when this statement was executed.
That's normal for Spark programs, given the architecture.

In this case the matching group is missing. Looking at this implementation, it seems like
this could be a problem already in 

{code}
  override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
    if (!p.equals(lastRegex)) {
      // regex value changed
      lastRegex = p.asInstanceOf[UTF8String].clone()
      pattern = Pattern.compile(lastRegex.toString)
    }
    val m = pattern.matcher(s.toString)
    if (m.find) {
      val mr: MatchResult = m.toMatchResult
      UTF8String.fromString(mr.group(r.asInstanceOf[Int]))
    } else {
      UTF8String.EMPTY_UTF8
    }
  }
{code}

mr.group() returns null in this case and so the whole method does. Seems like it's not supposed
to do that. I'll table a PR?

> regexp_extract with optional groups causes NPE
> ----------------------------------------------
>
>                 Key: SPARK-16409
>                 URL: https://issues.apache.org/jira/browse/SPARK-16409
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Max Moroz
>
> df = sqlContext.createDataFrame([['aaaac']], ['s'])
> df.select(F.regexp_extract('s', r'(a+)(b)?(c)', 2)).collect()
> causes NPE. Worse, in a large program it doesn't cause NPE instantly; it actually works
fine, until some unpredictable (and inconsistent) moment in the future when (presumably) the
invalid memory access occurs, and then it fails. For this reason, it took several hours to
debug this.
> Suggestion: either fill the group with null; or raise exception immediately after examining
the argument with a message that optional groups are not allowed.
> Traceback:
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-8-825292b569fc> in <module>()
> ----> 1 df.select(F.regexp_extract('s', r'(a+)(b)?(c)', 2)).collect()
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\pyspark\sql\dataframe.py
in collect(self)
>     294         """
>     295         with SCCallSiteSync(self._sc) as css:
> --> 296             port = self._jdf.collectToPython()
>     297         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
>     298 
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.1-src.zip\py4j\java_gateway.py
in __call__(self, *args)
>     931         answer = self.gateway_client.send_command(command)
>     932         return_value = get_return_value(
> --> 933             answer, self.gateway_client, self.target_id, self.name)
>     934 
>     935         for temp_arg in temp_args:
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\pyspark\sql\utils.py in
deco(*a, **kw)
>      55     def deco(*a, **kw):
>      56         try:
> ---> 57             return f(*a, **kw)
>      58         except py4j.protocol.Py4JJavaError as e:
>      59             s = e.java_exception.toString()
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.1-src.zip\py4j\protocol.py
in get_return_value(answer, gateway_client, target_id, name)
>     310                 raise Py4JJavaError(
>     311                     "An error occurred while calling {0}{1}{2}.\n".
> --> 312                     format(target_id, ".", name), value)
>     313             else:
>     314                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o51.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage
0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
> 	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
> 	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
> 	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> 	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 	at scala.Option.foreach(Option.scala:257)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1876)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1889)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
> 	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:883)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.collect(RDD.scala:882)
> 	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
> 	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2417)
> 	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2417)
> 	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2417)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> 	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
> 	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2416)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 	at py4j.Gateway.invoke(Gateway.java:280)
> 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
> 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at py4j.GatewayConnection.run(GatewayConnection.java:211)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> 	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
> 	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> 	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
> 	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> 	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	... 1 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message