flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Theo Diefenthal <theo.diefent...@scoop-software.de>
Subject Re: Passing parameters to filter function (in DataStreams)
Date Thu, 10 Oct 2019 12:13:39 GMT

Your original post looks like "computeThreshold" doesn't require any parameters, but is just
an expensive to compute operation. 

In that case, you can inherit from "RichFilterFunction" instead of "FilterFunction". In case
of "RichFilterFunction", you can override the "open"-method and perform your operation in
there just once and store the result e.g. in a transient variable. In that case, nothing gets
serialized and send over the network. The open method is guaranteed to be called only once
per operator and is called before the first call to "filter" is made. 

The pattern to pass arguments in general is totally fine. I often pass e.g. a connection String
as a parameter to my RichFunction and within the open method of the function, I establish
the connection to some remote system. 

Best regards 

Von: "Komal Mariam" <komal.mariam@gmail.com> 
An: "Chesnay Schepler" <chesnay@apache.org> 
CC: "user" <user@flink.apache.org> 
Gesendet: Donnerstag, 10. Oktober 2019 04:00:46 
Betreff: Re: Passing parameters to filter function (in DataStreams) 

Thank you @Chesnay! 

I also managed to pass arguments to a RichFilterFunction: new MyFilterFunc(Integer threshold
) by defining its constructor. 
If there's a better way to pass arguments I'd appreciate it if you let me know. 

On Tue, 8 Oct 2019 at 19:58, Chesnay Schepler < [ mailto:chesnay@apache.org | chesnay@apache.org
] > wrote: 

You can compute the threshold ahead of time and reference it directly in the filter function.

(Below are 2 examples, depending on whether you like lambdas or not) 
final int threshold = computeThreshold () ; temperatureStream.filter( new FilterFunction<Integer>()
{ @Override public boolean filter (Integer temperature) { return temperature > threshold
; }
}) ; 
final int threshold = computeThreshold () ; temperatureStream.filter(temperature -> temperature
> threshold ) ; 

On 08/10/2019 12:46, Komal Mariam wrote: 


Hi everyone, 

Suppose I have to compute a filter condition 

Integer threshold = compute threshold(); 

If I: 

temperatureStream.filter(new FilterFunction<temperature>() { 
public boolean filter(Integer temperature) throws Exception { 
Integer threshold = compute threshold(); 
return temperature > threshold 

would this mean I have computed threshold over and over again, for every new element in the

my threshold does not changes once it computed. I don't want to recompute it every time for
new elements? is there way I can pass it as a parameter to the filter function? 


View raw message