Hi Sideny,

>> I'd prefer not to consume messages I don't plan on actually handling. 

It depends on your design. If you produce different types into different partitions, then it's easy to filter different types from the Kafka consumer(only consume partial partition).

If you do not distinguish different types in the partitions of the Kafka topic. You can filter messages based on type in Flink job.

>> I MUST consume the messages, count those I want to filter out and then simply not handle them?

I did not say "you MUST", I said "you can".

Actually, there are serval solutions.

1) I described in the last mail;
2) filter in flink source;
3) filter via flink filter transform function
4) side output/split, selet

Choosing one solution that suite your scene. 

The key thing in my last mail is to describe the problem of your reflection problem.


Sidney Feiner <sidney.feiner@startapp.com> 于2019年12月16日周一 下午9:31写道:
You are right with everything you say!
The solution you propose is actually what I'm trying to avoid. I'd prefer not to consume messages I don't plan on actually handling. 
But from what you say it sounds I have no other choice. Am I right? I MUST consume the messages, count those I want to filter out and then simply not handle them?
Which means I must filter them in the task itself and I have no way of filtering them directly from the data source?

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp

From: vino yang <yanghua1127@gmail.com>
Sent: Monday, December 16, 2019 7:56 AM
To: Sidney Feiner <sidney.feiner@startapp.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Fw: Metrics based on data filtered from DataStreamSource
Hi Sidney,

Firstly, the `open` method of UDF's instance is always invoked when the task thread starts to run.

From the second code snippet image that you provided, I guess you are trying to get a dynamic handler with reflection technology, is that correct? I also guess that you want to get a dynamic instance of a handler in the runtime, correct me if I am wrong.

IMO, you may misunderstand the program you write and the runtime of Task, the purpose of your program is used to build the job graph. The business logic in UDF is used to describe the user's business logic.

For your scene, if many types of events exist in one topic, you can consume them and group by the type then count them?


Sidney Feiner <sidney.feiner@startapp.com> 于2019年12月16日周一 上午12:54写道:
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source (in my case, Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out an event (as part of the FilterFunction). 

Problem arrises when I use that FilterFunction on the DataSourceStream - the handler's open() function hasn't been called and thus the metrics have never been initiated.
Do I have a way of making this work? Or any other way of counting events that have been filtered out from the DataStreamSource?

public abstract class Handler extends RichMapFunction<Event, String> {
private transient Counter filteredCounter;
private boolean isInit = false;

public void open(Configuration parameters) throws Exception {
if (!isInit) {
MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
filteredCounter = metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
isInit = true;

public final FilterFunction getFilter() {
return (FilterFunction<Event>) event -> {
boolean res = filter(event);
if (!res) {
return res;

abstract protected boolean filter(Event event);

And when I init the DataStreamSource:
Handler handler = (Handler) Class.forName(handlerName).getConstructor().newInstance();
dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);

Any help would be much appreciated!

Thanks 🙂