flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Null Pointer Exception on Trying to read a message from Kafka
Date Mon, 28 Aug 2017 16:39:52 GMT
Which Flink version are you using (so that line numbers can be matched with
source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinkenthu@gmail.com>
wrote:

> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>                 getStreamSource(env, parameterTool);
>         );
>
>
>
>         public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment
> env, ParameterTool parameterTool) {
>
>            // MyKAfkaMessage is a ProtoBuf message
>             env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
> ProtobufSerializer.class);
>
>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
> new MyKafkaMessageSerDeSchema());
>
>             return flinkCepConsumer;
>         }
>
>
> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>
>     public KafkaDataSource(ParameterTool parameterTool,
> DeserializationSchema<T> deserializer) {
>         super(
>                 Arrays.asList(parameterTool.getRequired("topic").split(","
> )),
>                 deserializer,
>                 parameterTool.getProperties()
>         );
>
>     }
>
> }
>
> public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>,
> SerializationSchema<MyKafkaMessage> {
>
>     @Override
>     public MyKafkaMessage deserialize(byte[] message) throws IOException {
>         MyKafkaMessage MyKafkaMessage = null;
>         try {
>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>         } catch (InvalidProtocolBufferException e) {
>             e.printStackTrace();
>         } finally {
>             return MyKafkaMessage;
>         }
>     }
>
>     @Override
>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<MyKafkaMessage> getProducedType() {
>         return null;
>     }
>
>     @Override
>     public byte[] serialize(MyKafkaMessage element) {
>         return new byte[0];
>     }
> }
>
> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>
>> Which version of Flink / Kafka are you using ?
>>
>> Can you show the snippet of code where you create the DataStream ?
>>
>> Cheers
>>
>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinkenthu@gmail.com>
>> wrote:
>>
>>> Folks,
>>>
>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>> the following exception :
>>>
>>> Any idea on how can this happen?
>>>
>>> java.lang.NullPointerException
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>
>

Mime
View raw message