flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonard Xu <xbjt...@gmail.com>
Subject Re: SQL Expression to Flink FilterFunction?
Date Fri, 05 Jun 2020 02:17:31 GMT
Hi, Theo

Currently, It’s hard to do this in your DataStream application from my understanding, because
converting sql expression to Flink operator happens in underlying table planner (more precisely
in code generate phase) and it does not expose interface to user so that you can not assign
operator name, operator id.

Leonard Xu 

> 在 2020年6月5日,00:18,Theo Diefenthal <theo.diefenthal@scoop-software.de>
> Hi there,
> I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved).
In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter'
AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs. 
> I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction?
> My approach right now is to register my Stream as a table, run a SQL query on it and
return back to a DataStream like so:
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> List<SomePOJO> data = createPOJOTestData();
> DataStream<SomePOJO> stream = env.fromCollection(data);
> //final Table asTable = tEnv.fromDataStream(stream);
> //Table filteredTable = asTable.where("user === 'pitter' && age > 10"); //
NO SQL style 'AND' possible here...
> tEnv.registerDataStream("SAMPLE", stream);
> Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = 'pitter' AND age
> 10");
> stream = tEnv.toAppendStream(filteredTable, SomePOJO.class);
> List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream));
> //... test assertions
> It feels a bit weird that I need to go the full way up to the SQL API with registering
the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name
to this operator anymore, leaving the DataStream world. 
> Is the way I wrote it the best way to approach or do you have any better idea? Are there
any caveats here? Not that I didn't assign the event time column on purpose as I know that
it's just a WHERE without any windowing etc and I wanted to test that it still works without
any explicit time column :) 
> Best regards
> Theo

View raw message