flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Help with table UDF
Date Thu, 31 Aug 2017 20:36:27 GMT
Hi Flavio,

you're using the TableFunction not correctly. The documentation shows how
to call it in a join() method.
But I agree, the error message should be better.

Best, Fabian


2017-08-31 18:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> Hi all,
> I'm using Flink 1.3.1 and I'm trying to register an UDF but there's
> something wrong.
> I always get the following exception:
>
> java.lang.UnsupportedOperationException: org.apache.flink.table.expressions.TableFunctionCall
> cannot be transformed to RexNode
> at org.apache.flink.table.expressions.Expression.
> toRexNode(Expression.scala:53)
> at org.apache.flink.table.expressions.Alias.toRexNode(
> fieldExpression.scala:79)
> at org.apache.flink.table.plan.logical.Project$$anonfun$
> construct$1.apply(operators.scala:94)
> at org.apache.flink.table.plan.logical.Project$$anonfun$
> construct$1.apply(operators.scala:94)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:32)
> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.flink.table.plan.logical.Project.construct(
> operators.scala:94)
> at org.apache.flink.table.plan.logical.LogicalNode.toRelNode(
> LogicalNode.scala:77)
> at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
> at
>
> -------------------------------------------------
> This is my Program:
>
> final ExecutionEnvironment env = DatalinksExecutionEnvironment.
> getExecutionEnv();
> final BatchTableEnvironment tEnv = TableEnvironment.
> getTableEnvironment(env);
>
> DataSet<Row> dataSet = null;
> dataSet = env.fromElements("{\"test\":\"val\"}").map(new
> MapFunction<String, Row>() {
>
>         @Override
>         public Row map(String value) throws Exception {
>           Row ret = new Row(1);
>           ret.setField(0, value);
>           return ret;
>         }
>       }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
>
> tEnv.registerFunction("myFunc", new MyTableFunction());
> Table test = tEnv.fromDataSet(dataSet, "field1");
>  Table res = test.select("field1,myFunc(recon)");
>  dataSet = tEnv.toDataSet(res,
>            new RowTypeInfo(
>                    BasicTypeInfo.STRING_TYPE_INFO,
>                    BasicTypeInfo.STRING_TYPE_INFO));
>   dataSet.print();
>
>
> MyTableFunction is something like:
>
> public class MyTableFunction extends TableFunction<String> {
>   public String eval(String str) {
>     return "XXX";
>   }
> }
>
>
> What I'm doing wrong here?
>
> Thanks in advance,
> Flavio
>

Mime
View raw message