flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a**** <18612537...@163.com>
Subject Re:Re: map不能返回null值吗
Date Sun, 29 Sep 2019 08:51:26 GMT



ok,我知道了。确定一下,之前没发现,跟了一下代码,所以问一下。多谢!





在 2019-09-29 16:44:53,"Qi Luo" <luoqi.bd@gmail.com> 写道:
>Hi Allan,
>
>map只能返回非null,你可以考虑使用flatMap。
>
>Qi
>
>On Sun, Sep 29, 2019 at 4:31 PM allan <18612537914@163.com> wrote:
>
>> Hi,
>> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。
>>
>> java.lang.NullPointerException
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>         at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>         at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>>
>> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数
record为空的情况下会报异常。
>> 难道map 不能返回null值吗?
>>
>>
>> @Override
>>    protected <X> void pushToOperator(StreamRecord<X> record) {
>>       try {
>>          // we know that the given outputTag matches our OutputTag so the
>> record
>>          // must be of the type that our operator (and Serializer) expects.
>>          @SuppressWarnings("unchecked")
>>          StreamRecord<T> castRecord = (StreamRecord<T>) record;
>>
>>          numRecordsIn.inc();
>>          StreamRecord<T> copy =
>> castRecord.copy(serializer.copy(castRecord.getValue()));
>>          operator.setKeyContextElement1(copy);
>>          operator.processElement(copy);
>>       } catch (ClassCastException e) {
>>          if (outputTag != null) {
>>             // Enrich error message
>>             ClassCastException replace = new ClassCastException(
>>                String.format(
>>                   "%s. Failed to push OutputTag with id '%s' to operator.
>> " +
>>                      "This can occur when multiple OutputTags with
>> different types " +
>>                      "but identical names are being used.",
>>                   e.getMessage(),
>>                   outputTag.getId()));
>>
>>             throw new ExceptionInChainedOperatorException(replace);
>>          } else {
>>             throw new ExceptionInChainedOperatorException(e);
>>          }
>>       } catch (Exception e) {
>>          throw new ExceptionInChainedOperatorException(e);
>>       }
>>
>>    }
>> }
>>
>>
>>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message