flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Hutchison (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
Date Tue, 21 Mar 2017 04:15:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15934071#comment-15934071

Luke Hutchison commented on FLINK-6115:

It's quite easy to end up with null values in tuples though -- it's currently entirely valid
to store values in tuples, they just can't be serialized -- and you can't always predict when
tuples will be serialized -- e.g. (I suspect) code may work fine for a while in RAM on a single
machine, but then when you scale your code up to run on a cluster, or even when code decides
to spill to disk, suddenly it breaks. This is very poor behavior.

Even worse though is that it's exceedingly hard to tell where the problem is caused, as shown
in the stack trace I posted. Not only is the location where the null value is set separated
from the location where the problem is triggered on serialization, but the serialization trace
doesn't tell you anything about where in the program the serializer was running, other than
what operation type it was contained within.

Another common scenario in which null values get set in tuples is doing an outer join. Basically
if the Flink policy is "we won't support nulls in tuples as valid, ever", then you should
not be able to produce a tuple as a result of an outer join. More generally, you should simply
throw an exception when the constructor of a tuple is called with a null parameter, so that
the user is notified immediately of the invalid behavior, with the exception tied directly
to where the null value setting happened. This would not be a perfect fix though, since the
fields of a tuple are not final, so it is possible to simply set the field values to null

I don't see any of these as good solutions to this issue. Really the best thing to do is find
a way to efficiently serialize null values in tuples. Why exactly is it slower to support
serializing null values in tuples than it is for a POJO or a {{Row}} object?

In theory, if you simply ran tuples through the POJO serializer, it should be able to serialize
them fine, with the same efficiency that it can serialize regular POJOs (which are allowed
to contain null values) -- so I don't see how or why this would incur a performance pentalty.

> Need more helpful error message when trying to serialize a tuple with a null field
> ----------------------------------------------------------------------------------
>                 Key: FLINK-6115
>                 URL: https://issues.apache.org/jira/browse/FLINK-6115
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
> When Flink tries to serialize a tuple with a null field, you get the following, which
has no information about where in the program the problem occurred (all the stack trace lines
are in Flink, not in user code).
> {noformat}
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
> 	at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
> 	at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
> 	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> 	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> 	at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
> 	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> 	at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
> 	at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a flatMap (but I
have dozens of them in my code). Surely there's a way to pull out the source file name and
line number from the program DAG node when errors like this occur?

This message was sent by Atlassian JIRA

View raw message