flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Hutchison (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
Date Fri, 31 Mar 2017 08:13:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950505#comment-15950505

Luke Hutchison commented on FLINK-6115:

[~Zentol] In point (1), I wasn't referring to the overhead of serialization at all -- I was
referring to the memory and time overhead of wrapping types in {{Optional}}, as a workaround
for the lack of {{null}} value support in tuples. Specifically, using {{Optional}} types requires
an additional heap allocation, which incurs a 16-byte overhead on a 64-bit VM, and comes at
significant speed cost (memory allocations and garbage collection are the principal reason
for inefficiency in JITted Java vs. native code). ( {{Optional}} also adds a layer of indirection,
which adds a marginal additional performance cost.)

In other words, the point I was making in (1) is that it is circular reasoning to say "we
decided not to support null values in tuples for efficiency reasons -- so use {{Optional}}
instead if you need null values".

The other case to consider is the impact on code that never needs {{null}} values. That is
what my point (2) addresses: that even for code that doesn't use {{null}} , the overhead of
adding a bitfield to the serialized format will have an extremely minimal performance impact,
relative to the amount of time actually doing anything with the serialized format -- because
inevitably when you serialize an object, you're going to store it or transmit it, which takes
several orders of magnitude more time to do than serializing or deserializing. And the memory
overhead is one bit per field, rounded up to the nearest byte.

So -- to summarize those two points more succintly:

# Not supporting serialization of {{null}} incurs a major overhead for code that needs to
use {{null}} values, by requiring the use of {{Optional}} wrappers, which can incur a large
performance and memory penalty.
# Supporting serialization of {{null}} would incur only a very minimal relative overhead on
code that doesn't need to use {{null}} values.

In other words, so far the efficiency argument has been the strongest argument made in favor
of the status quo -- however, the efficiency argument really doesn't hold water as a reason
not to support {{null}} values in tuples.

Premature optimization is often a source of problems, which is why I asked if this has been
benchmarked before. I have to imagine that somebody has benchmark numbers somewhere comparing
different approaches.

> Need more helpful error message when trying to serialize a tuple with a null field
> ----------------------------------------------------------------------------------
>                 Key: FLINK-6115
>                 URL: https://issues.apache.org/jira/browse/FLINK-6115
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
> When Flink tries to serialize a tuple with a null field, you get the following, which
has no information about where in the program the problem occurred (all the stack trace lines
are in Flink, not in user code).
> {noformat}
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
> 	at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
> 	at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
> 	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> 	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> 	at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
> 	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> 	at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
> 	at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a flatMap (but I
have dozens of them in my code). Surely there's a way to pull out the source file name and
line number from the program DAG node when errors like this occur?

This message was sent by Atlassian JIRA

View raw message