flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Creating a custom operator
Date Mon, 09 May 2016 10:24:02 GMT
Hi Simone,

sorry for the delayed answer. I have a few questions regarding your
requirements and a some ideas that might be helpful (depending on the

1) Starting / stopping of H2O nodes from Flink
- You wrote you'd like to "instantiate a H2O's node in every task manager".
This reads a bit like you want to start H2O in the TM's JVM , but I would
assume that a H2O node runs as a separate process. So should it be started
inside the TM JVM or as an external process next to each TM. Also, do you
want to start one H2O node per TM slot or per TM?
- You wrote you'd like to "handle the lifecycle of the node according to
the taskmanager and the execution graph". A TM can execute multiple jobs
each with its own execution graph. Do you want to start the H2O node for
each job and shut it down when the job finishes or start the H2O when the
TM is started and kill it when the TM is brought down?
- "keep the H2O's nodes alive through multiple tasks" The first option
(starting for each job) would allow to share the H2O node for all tasks of
a job. This could be done using two MapPartition operators, the first
Mapper is put in front of the first task requiring H2O starting an H2O
service before the first record is forwarded and the second task is put
after the last H2O task and shuts it down after the last element was
forwarded. The mappers itself do nothing than forwarding elements and
starting and stopping tasks. If you would like to share H2O nodes across
jobs, we might need another hook to start the process.
- "move the data locally from Flink to H2O", do you mean host local or JVM
local? I think it should not be hard to keep the data host local.

2) "Allow the developer to code everything inside Flink".
- The Table API which you are referring to in your example is built on top
of the DataSet and DataStream APIs. I think it should be possible to add
another API similar to the Table API. You should be aware that the Table
API is currently quite actively developed and should not be considered to
be a stable interface. So certain things might change in the next versions.
With 1.0 we stabilized the DataSet API and I would rather put a new API on
top of it than on the Table API.
- Regarding the transformation in H2O structures and calling H2O
operations, I think this might again be done in MapPartition operators. In
general, MapPartition gives you a lot of freedom because it provides an
iterator over all elements of a partition. So you can do things before the
first and after the last element and group data as you like. You can use
partitionByHash() or rebalace() to shuffle data and sortPartition to
locally sort the data in a partition. Please note that MapPartition
operators do not support chaining and come therefore with a certain
serialization overhead. Whenever possible you should use a MapFunction or
FlatMapFunction which are a bit more lightweight.

Hope this helps,

2016-05-03 15:13 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io>:

> I'm not sure this is the right way to do it but we were exploring all the
> possibilities and this one is the more obvious. We also spent some time to
> study how to do it to achieve a better understanding of Flink's internals.
> What we want to do though is to integrate Flink with another distributed
> system that builds its own nodes and coordinates through the network with
> its own logic. This software is H2O (a Machine Learning platform) and the
> integration consists of two big tasks: the first is to instantiate a H2O's
> node in every task manager and handle the lifecycle of the node according
> to the taskmanager and the execution graph. The second is to allow the
> developer to code everything inside Flink, converting from and to H2O's
> data structures (distributed tabular data) and triggering the execution of
> algorithms on H2O with a uniform API.
> Here's a simple example (assuming that we will use the TableAPI):
> val env = ExecutionEnvironment.getExecutionEnvironment
> val h2oEnv = H2OEnviroment.getEnvironment(env)
> val myData: Table = ...
> val someOtherData: Table = ...
> val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)
> val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)
> val predictions:Table=linearRegressionModel(someOtherData)
> predictions.select(...)
> A good solution should allow the system to keep the H2O's nodes alive
> through multiple tasks and the possibility to move the data locally from
> Flink to H2O. The latter is not achieved in H2O's integration with Spark
> but we still hope to do it.
> That said, I'm still not sure if it is really required to implement a
> custom runtime operator but given the complexity of the integration of two
> distribute systems, we assumed that more control would allow more
> flexibility and possibilities to achieve an ideal solution.
> 2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>> Hi Simone,
>> you are right, the interfaces you extend are not considered to be public,
>> user-facing API.
>> Adding custom operators to the DataSet API touches many parts of the
>> system and is not straightforward.
>> The DataStream API has better support for custom operators.
>> Can you explain what kind of operator you would like to add?
>> Maybe the functionality can be achieved with the existing operators.
>> Best, Fabian
>> 2016-05-03 12:54 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io>:
>>> Hello Fabian,
>>> we delved more moving from the input you gave us but a question arised.
>>> We always assumed that runtime operators were open for extension without
>>> modifying anything inside Flink but it looks like this is not the case and
>>> the documentation assumes that the developer is working to a contribution
>>> to Flink. So I would like to know if our understandment is correct and
>>> custom runtime operators are not supposed to be implemented outside of
>>> Flink.
>>> Thanks,
>>> Simone
>>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>>> Hi Simone,
>>>> the GraphCreatingVisitor transforms the common operator plan into a
>>>> representation that is translated by the optimizer.
>>>> You have to implement an OptimizerNode and OperatorDescriptor to
>>>> describe the operator.
>>>> Depending on the semantics of the operator, there are a few more places
>>>> to make the integration working like driver strategies, cost model, etc.
>>>> I would recommend to have a look at previous changes that added an
>>>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
>>>> The respective commits should give you an idea which parts of the code
>>>> need to be touched. You should find the commit IDs in the JIRA issues for
>>>> these extensions.
>>>> Cheers, Fabian
>>>> 2016-04-29 15:32 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io
>>>> >:
>>>>> Hello,
>>>>> I'm trying to create a custom operator to explore the internals of
>>>>> Flink. Actually the one I'm working on is rather similar to Union and
>>>>> trying to mimick it for now. When I run my job though, this error arise:
>>>>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>>>>> operator type: MyOperator - My Operator
>>>>> at
>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>>>>> at
>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>>>>> at
>>>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>>>>> at
>>>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>>>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>>>> at
>>>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>>>>> at
>>>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>>>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>>>>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>>> I looked at the location of the error but it's not clear to me how to
>>>>> make my operator recognizable from the optimizer.
>>>>> Thank,
>>>>> Simone

View raw message