flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Simone Robutti <simone.robu...@radicalbit.io>
Subject Re: Creating a custom operator
Date Tue, 03 May 2016 13:13:03 GMT
I'm not sure this is the right way to do it but we were exploring all the
possibilities and this one is the more obvious. We also spent some time to
study how to do it to achieve a better understanding of Flink's internals.

What we want to do though is to integrate Flink with another distributed
system that builds its own nodes and coordinates through the network with
its own logic. This software is H2O (a Machine Learning platform) and the
integration consists of two big tasks: the first is to instantiate a H2O's
node in every task manager and handle the lifecycle of the node according
to the taskmanager and the execution graph. The second is to allow the
developer to code everything inside Flink, converting from and to H2O's
data structures (distributed tabular data) and triggering the execution of
algorithms on H2O with a uniform API.

Here's a simple example (assuming that we will use the TableAPI):

val env = ExecutionEnvironment.getExecutionEnvironment
val h2oEnv = H2OEnviroment.getEnvironment(env)

val myData: Table = ...
val someOtherData: Table = ...

val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)

val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)

val predictions:Table=linearRegressionModel(someOtherData)


A good solution should allow the system to keep the H2O's nodes alive
through multiple tasks and the possibility to move the data locally from
Flink to H2O. The latter is not achieved in H2O's integration with Spark
but we still hope to do it.

That said, I'm still not sure if it is really required to implement a
custom runtime operator but given the complexity of the integration of two
distribute systems, we assumed that more control would allow more
flexibility and possibilities to achieve an ideal solution.

2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Hi Simone,
> you are right, the interfaces you extend are not considered to be public,
> user-facing API.
> Adding custom operators to the DataSet API touches many parts of the
> system and is not straightforward.
> The DataStream API has better support for custom operators.
> Can you explain what kind of operator you would like to add?
> Maybe the functionality can be achieved with the existing operators.
> Best, Fabian
> 2016-05-03 12:54 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io>:
>> Hello Fabian,
>> we delved more moving from the input you gave us but a question arised.
>> We always assumed that runtime operators were open for extension without
>> modifying anything inside Flink but it looks like this is not the case and
>> the documentation assumes that the developer is working to a contribution
>> to Flink. So I would like to know if our understandment is correct and
>> custom runtime operators are not supposed to be implemented outside of
>> Flink.
>> Thanks,
>> Simone
>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>> Hi Simone,
>>> the GraphCreatingVisitor transforms the common operator plan into a
>>> representation that is translated by the optimizer.
>>> You have to implement an OptimizerNode and OperatorDescriptor to
>>> describe the operator.
>>> Depending on the semantics of the operator, there are a few more places
>>> to make the integration working like driver strategies, cost model, etc.
>>> I would recommend to have a look at previous changes that added an
>>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
>>> The respective commits should give you an idea which parts of the code
>>> need to be touched. You should find the commit IDs in the JIRA issues for
>>> these extensions.
>>> Cheers, Fabian
>>> 2016-04-29 15:32 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io>
>>> :
>>>> Hello,
>>>> I'm trying to create a custom operator to explore the internals of
>>>> Flink. Actually the one I'm working on is rather similar to Union and I'm
>>>> trying to mimick it for now. When I run my job though, this error arise:
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>>>> operator type: MyOperator - My Operator
>>>> at
>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>>>> at
>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>>>> at
>>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>>>> at
>>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>>> at
>>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>>>> at
>>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>>>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>> I looked at the location of the error but it's not clear to me how to
>>>> make my operator recognizable from the optimizer.
>>>> Thank,
>>>> Simone

View raw message