Hi Henry,

could you share a little reproducible example? From what I see you are using a custom aggregate function with a case class inside, right? Flink's case class serializer does not support null because the usage of `null` is also not very Scala like.

Use a `Row` type for supporting nulls properly.

Hope this helps.

Timo


Am 30.01.19 um 12:35 schrieb 徐涛:
Hi Experts,
In my self-defined UDAF, I found if I return a null value in UDAF, would cause checkpoint fails, the following is the error log:
I think it is quite a common case to return a null value in UDAF, because sometimes no value could be determined, why Flink has such a limitation for UDAF return value?
Thanks a lot!


org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 4 for operator groupBy: (DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, LAST_UPDATE_TIME) AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> Sink: Unnamed (2/4).
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator groupBy: (DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, LAST_UPDATE_TIME) AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> Sink: Unnamed (2/4).
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
	... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.types.NullFieldException: Field 0 is null, but expected to hold a value.
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
	... 5 common frames omitted
Caused by: org.apache.flink.types.NullFieldException: Field 0 is null, but expected to hold a value.
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:116)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:47)
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.lambda$getKeyGroupWriter$0(CopyOnWriteStateTableSnapshot.java:148)
	at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResult.writeStateInKeyGroup(KeyGroupPartitioner.java:261)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:757)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
	... 7 common frames omitted
Caused by: java.lang.NullPointerException: null
	at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:69)
	at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
	... 17 common frames omitted



Best
Henry