flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Null pointer exception while trying to serialize a protobuf message
Date Fri, 04 Aug 2017 14:37:05 GMT
Can you show how CustomerMessage is defined ?

Thanks

On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <flinkenthu@gmail.com>
wrote:

> Folks,
>
> I wrote a custom Data source to test me CEP logic. The custom data source
> looks like :
>
> public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
>     private boolean running = true;
>     private final Random random;
>
>     public CustomerDataSource() {
>         this.random = new Random();
>     }
>
>     @Override
>     public void run(SourceContext<CustomerMessage> ctx) throws Exception {
>         while (running) {
>             new CustomerDataGen().generateMessages().
>                     forEach(element -> ctx.collect(element));
>
>             Thread.sleep(10000);
>         }
>     }
>
>     @Override
>     public void cancel() {
>         running = false;
>     }
> }
>
> public class CustomerDataGen {
>
>     public CustomerDataGen() {
>         this.random = new Random();
>     }
>     @Override
>     public List<CustomerMessage> generateMessages() throws InterruptedException
{
>         List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
>
>         messages.add(getMessage());
>         return messages;
>     }
>
>     private CustomerMessage getMessage() {
>         Instant time = Instant.now();
>         Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
>         Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()
-1).setNanos(0).build();
>         return CustomerMessage.newBuilder().
>                 setName("SomeCustomer").
>                 setEventTimestamp(eventTimeStamp).
>                 setCustomerId("01234").
>                 addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
>                 setEmail("customer@foo.com").
>                 build();
>     }
> }
>
> In my Main program :
>
> .........
> env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
> env.addSource(new CustomerDataSource());
>
> env.execute();
>
>
> When I run the program, I get the following exception :
>
>
> Caused by: java.lang.NullPointerException
> 	at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
> 	at java.util.AbstractList.add(AbstractList.java:108)
> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> 	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
> 	at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
> 	at java.util.ArrayList.forEach(ArrayList.java:1249)
> 	at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
>
> I am having a tough time figuring out why. Can someone help me out as to where am I going
wrong?
>
>
>

Mime
View raw message