flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: UDAF Flink-SQL return null would lead to checkpoint fails
Date Wed, 30 Jan 2019 13:05:48 GMT
Hi Henry,

could you share a little reproducible example? From what I see you are 
using a custom aggregate function with a case class inside, right? 
Flink's case class serializer does not support null because the usage of 
`null` is also not very Scala like.

Use a `Row` type for supporting nulls properly.

Hope this helps.

Timo


Am 30.01.19 um 12:35 schrieb 徐涛:
> Hi Experts,
> In my self-defined UDAF, I found if I return a null value in UDAF, 
> would cause checkpoint fails, the following is the error log:
> I think it is quite a common case to return a null value in UDAF, 
> because sometimes no value could be determined, why Flink has such a 
> limitation for UDAF return value?
> Thanks a lot!
>
>
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception:
Could not materialize checkpoint 4 for operator groupBy: (DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID,
latest_value_long_test($f1, LAST_UPDATE_TIME) AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS
EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> Sink:
Unnamed (2/4).
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator groupBy:
(DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, LAST_UPDATE_TIME) AS
tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111'
AS EXPR$3) -> to: Tuple2 -> Sink: Unnamed (2/4).
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> 	... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: org.apache.flink.types.NullFieldException:
Field 0 is null, but expected to hold a value.
> 	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> 	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> 	... 5 common frames omitted
> Caused by: org.apache.flink.types.NullFieldException: Field 0 is null, but expected to
hold a value.
> 	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:116)
> 	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> 	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
> 	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:47)
> 	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.lambda$getKeyGroupWriter$0(CopyOnWriteStateTableSnapshot.java:148)
> 	at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResult.writeStateInKeyGroup(KeyGroupPartitioner.java:261)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:757)
> 	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
> 	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> 	... 7 common frames omitted
> Caused by: java.lang.NullPointerException: null
> 	at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:69)
> 	at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
> 	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> 	... 17 common frames omitted
>
>
>
> Best
> Henry



Mime
View raw message