flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Kafka Producer - Null Pointer Exception when processing by element
Date Mon, 17 Jul 2017 07:33:05 GMT
With task chaining as you're saying, could you help clarify how it works 
Operator can be chained to be executed by a single task thread. See [1] for more details on

Basically, when two operators are chained together, the output of the first operator is immediately
chained to the processElement of the next operator; it’s therefore just a consecutive invocation
of processElements on the chained operators. There will be no thread-to-thread handover or

For example, a 
byte[] record can return from our parser a List of 10 SuccessEvents and 1 
ErrorEvent; we want to publish each Event immediately.
In that case, I would suggest using flatMap here, followed by chained splits and then sinks.

Using flatMap, you can collect elements as you iterate through the list element (i.e. `collector.collect(...)`).
If the sinks are properly chained (which should be the case if there is no keyBy before the
sink and you haven’t explicitly configured otherwise [2]), then for each .collect(...) the
sink write will be invoked as part of the chain.

Effectively, this would then be writing to Kafka / Cassandra for every element as you iterate
through that list (happening in the same thread since everything is chained), and matches
what you have in mind.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#tasks-and-operator-chains
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups

On 17 July 2017 at 2:06:52 PM, earellano (eric.arellano@ge.com) wrote:


Tzu-Li (Gordon) Tai wrote  
> These seems odd. Are your events intended to be a list? If not, this  
> should be a `DataStream  
> <SuperclassEvent>  
> `.  
> From the code snippet you’ve attached in the first post, it seems like  
> you’ve initialized your source incorrectly.  
> `env.fromElements(List<...>)` will take the whole list as a single event,  
> thus your source is only emitting a single list as a record.  

Ah sorry for the confusion. So the original code snippet isn't our actual  
code - it's a simplified and generified version so that it would be easy to  
reproduce the Null Pointer Exception without having to show our whole code  

To clarify, our input actually uses a Kafka Consumer that reads a byte[],  
which is then passed to our external library parser which takes a byte[] and  
converts it into a List<Events>. This is why we have to use  
DataStream<List&lt;Events>>, rather than just DataStream<Event>. It's a
requirement from the parser we have to use, because each byte[] array record  
can create both a SuccessEvent(s) and/or ErrorEvent(s).  

Our motivation for using the above map & for loop with conditional output  
logic was that we have to work with this whole List<Events> and not just  
individual Events, but don't want to wait for the whole list to be processed  
for the event at the beginning of the list to be outputted. For example, a  
byte[] record can return from our parser a List of 10 SuccessEvents and 1  
ErrorEvent; we want to publish each Event immediately. Low latency is  
extremely important to us.  


With task chaining as you're saying, could you help clarify how it works  
please? With each record of type List<Events> and calling the Split Operator  
followed by the sink operators, does that whole record/list have to be split  
before it can then go on to the sink? Or does task chaining mean it  
immediately gets outputted to the sink?  

Thanks so much for all this help by the way!  

View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

View raw message