flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sridhar Chellappa <flinken...@gmail.com>
Subject Re: Null Pointer Exception on Trying to read a message from Kafka
Date Tue, 29 Aug 2017 01:13:17 GMT
1.3.0

On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhihong@gmail.com> wrote:

> Which Flink version are you using (so that line numbers can be matched
> with source code) ?
>
> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinkenthu@gmail.com>
> wrote:
>
>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>                 getStreamSource(env, parameterTool);
>>         );
>>
>>
>>
>>         public RichParallelSourceFunction<MyKafkaMessage>
>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>> parameterTool) {
>>
>>            // MyKAfkaMessage is a ProtoBuf message
>>             env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>> ProtobufSerializer.class);
>>
>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>> new MyKafkaMessageSerDeSchema());
>>
>>             return flinkCepConsumer;
>>         }
>>
>>
>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>
>>     public KafkaDataSource(ParameterTool parameterTool,
>> DeserializationSchema<T> deserializer) {
>>         super(
>>                 Arrays.asList(parameterTool.ge
>> tRequired("topic").split(",")),
>>                 deserializer,
>>                 parameterTool.getProperties()
>>         );
>>
>>     }
>>
>> }
>>
>> public class MyKafkaMessageSerDeSchema implements
>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>> {
>>
>>     @Override
>>     public MyKafkaMessage deserialize(byte[] message) throws IOException {
>>         MyKafkaMessage MyKafkaMessage = null;
>>         try {
>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>         } catch (InvalidProtocolBufferException e) {
>>             e.printStackTrace();
>>         } finally {
>>             return MyKafkaMessage;
>>         }
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>         return null;
>>     }
>>
>>     @Override
>>     public byte[] serialize(MyKafkaMessage element) {
>>         return new byte[0];
>>     }
>> }
>>
>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>
>>> Which version of Flink / Kafka are you using ?
>>>
>>> Can you show the snippet of code where you create the DataStream ?
>>>
>>> Cheers
>>>
>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinkenthu@gmail.com
>>> > wrote:
>>>
>>>> Folks,
>>>>
>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>> the following exception :
>>>>
>>>> Any idea on how can this happen?
>>>>
>>>> java.lang.NullPointerException
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>
>>
>

Mime
View raw message