flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Creating a custom operator
Date Fri, 29 Apr 2016 19:32:47 GMT
Hi Simone,

the GraphCreatingVisitor transforms the common operator plan into a
representation that is translated by the optimizer.
You have to implement an OptimizerNode and OperatorDescriptor to describe
the operator.
Depending on the semantics of the operator, there are a few more places to
make the integration working like driver strategies, cost model, etc.

I would recommend to have a look at previous changes that added an operator
such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
The respective commits should give you an idea which parts of the code need
to be touched. You should find the commit IDs in the JIRA issues for these

Cheers, Fabian

2016-04-29 15:32 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io>:

> Hello,
> I'm trying to create a custom operator to explore the internals of Flink.
> Actually the one I'm working on is rather similar to Union and I'm trying
> to mimick it for now. When I run my job though, this error arise:
> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
> operator type: MyOperator - My Operator
> at
> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
> at
> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
> at
> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
> at
> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> at
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
> at
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
> at io.radicalbit.flinkh2o.Job.main(Job.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> I looked at the location of the error but it's not clear to me how to make
> my operator recognizable from the optimizer.
> Thank,
> Simone

View raw message