flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sidney Feiner <sidney.fei...@startapp.com>
Subject Re: Fw: Metrics based on data filtered from DataStreamSource
Date Mon, 16 Dec 2019 13:31:26 GMT
You are right with everything you say!
The solution you propose is actually what I'm trying to avoid. I'd prefer not to consume messages
I don't plan on actually handling.
But from what you say it sounds I have no other choice. Am I right? I MUST consume the messages,
count those I want to filter out and then simply not handle them?
Which means I must filter them in the task itself and I have no way of filtering them directly
from the data source?

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp


From: vino yang <yanghua1127@gmail.com>
Sent: Monday, December 16, 2019 7:56 AM
To: Sidney Feiner <sidney.feiner@startapp.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Fw: Metrics based on data filtered from DataStreamSource

Hi Sidney,

Firstly, the `open` method of UDF's instance is always invoked when the task thread starts
to run.

From the second code snippet image that you provided, I guess you are trying to get a dynamic
handler with reflection technology, is that correct? I also guess that you want to get a dynamic
instance of a handler in the runtime, correct me if I am wrong.

IMO, you may misunderstand the program you write and the runtime of Task, the purpose of your
program is used to build the job graph. The business logic in UDF is used to describe the
user's business logic.

For your scene, if many types of events exist in one topic, you can consume them and group
by the type then count them?


Sidney Feiner <sidney.feiner@startapp.com<mailto:sidney.feiner@startapp.com>>
于2019年12月16日周一 上午12:54写道:
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source (in my case,
Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out an event (as
part of the FilterFunction).

Problem arrises when I use that FilterFunction on the DataSourceStream - the handler's open()
function hasn't been called and thus the metrics have never been initiated.
Do I have a way of making this work? Or any other way of counting events that have been filtered
out from the DataStreamSource?


public abstract class Handler extends RichMapFunction<Event, String> {
    private transient Counter filteredCounter;
    private boolean isInit = false;

    public void open(Configuration parameters) throws Exception {
        if (!isInit) {
            MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
            filteredCounter = metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
            isInit = true;

    public final FilterFunction getFilter() {
        return (FilterFunction<Event>) event -> {
            boolean res = filter(event);
            if (!res) {
            return res;

    abstract protected boolean filter(Event event);

And when I init the DataStreamSource:

Handler handler = (Handler) Class.forName(handlerName).getConstructor().newInstance();
dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);

Any help would be much appreciated!

Thanks 🙂

View raw message