flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Creating a custom operator
Date Wed, 11 May 2016 09:49:50 GMT
2016-05-09 14:56 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io>:

> >- You wrote you'd like to "instantiate a H2O's node in every task
> manager". This reads a bit like you want to start H2O in the TM's JVM , but
> I would assume that a H2O node runs as a separate process. So should it be
> started inside the TM JVM or as an external process next to each TM. Also,
> do you want to start one H2O node per TM slot or per TM?
>
> My idea is to run it in the same process but there may be several good
> reasons not to do it, it's just the way I think of it right now. I'm
> thinking about replicating the structure of Sparkling Water and for my
> understanding, they run their H2O nodes in the same process.
>
> That should be possible by starting a thread from a MapPartition operator.
To make it one H2O node per TM, you would need a synchronized singleton to
avoid that each parallel task starts a new thread.


>
> >- You wrote you'd like to "handle the lifecycle of the node according to
> the taskmanager and the execution graph". A TM can execute multiple jobs
> each with its own execution graph. Do you want to start the H2O node for
> each job and shut it down when the job finishes or start the H2O when the
> TM is started and kill it when the TM is brought down?
>
> There are different trade-offs for both choices. I assume that there's
> nothing inside H2O that should be shared between different jobs for most
> use cases so it should follow the job's lifecycle. In the previous mail
> this was ambigous, my bad.
>

The approach with two MapPartition operators at the beginning and end of
the H2O section might work then.


> >- "move the data locally from Flink to H2O", do you mean host local or
> JVM local? I think it should not be hard to keep the data host local.
>
> JVM local. This is clearly not an issue flink-side but may be an issue on
> H2O's side. It's one of the many issues we will tackle as soon as we will
> talk with them (I hope soon).
>
> >- The Table API which you are referring to in your example is built on
> top of the DataSet and DataStream APIs. I think it should be possible to
> add another API similar to the Table API. You should be aware that the
> Table API is currently quite actively developed and should not be
> considered to be a stable interface. So certain things might change in the
> next versions. With 1.0 we stabilized the DataSet API and I would rather
> put a new API on top of it than on the Table API.
>
> We know but we will work mostly with the data abstractions of the Table
> API and not the operations. We take the risk to rework it if they change in
> the meantime.
>
> Your reply really helped: many questions helped us clear our mind on a few
> points. H2O's team showed interest in working on this integration or at
> least support us in the development. We are waiting for them to start a
> discussion and as soon as we will have a more clear idea on how to proceed,
> we will validate it with the stuff you just said. Your confidence in
> Flink's operators gives up hope to achieve a clean solution.
>
> Thanks a lot of your time,
>
Simone
>
> Sure :-)

Cheers, Fabian


> 2016-05-09 12:24 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>
>> Hi Simone,
>>
>> sorry for the delayed answer. I have a few questions regarding your
>> requirements and a some ideas that might be helpful (depending on the
>> requirements).
>>
>> 1) Starting / stopping of H2O nodes from Flink
>> - You wrote you'd like to "instantiate a H2O's node in every task
>> manager". This reads a bit like you want to start H2O in the TM's JVM , but
>> I would assume that a H2O node runs as a separate process. So should it be
>> started inside the TM JVM or as an external process next to each TM. Also,
>> do you want to start one H2O node per TM slot or per TM?
>> - You wrote you'd like to "handle the lifecycle of the node according to
>> the taskmanager and the execution graph". A TM can execute multiple jobs
>> each with its own execution graph. Do you want to start the H2O node for
>> each job and shut it down when the job finishes or start the H2O when the
>> TM is started and kill it when the TM is brought down?
>> - "keep the H2O's nodes alive through multiple tasks" The first option
>> (starting for each job) would allow to share the H2O node for all tasks of
>> a job. This could be done using two MapPartition operators, the first
>> Mapper is put in front of the first task requiring H2O starting an H2O
>> service before the first record is forwarded and the second task is put
>> after the last H2O task and shuts it down after the last element was
>> forwarded. The mappers itself do nothing than forwarding elements and
>> starting and stopping tasks. If you would like to share H2O nodes across
>> jobs, we might need another hook to start the process.
>> - "move the data locally from Flink to H2O", do you mean host local or
>> JVM local? I think it should not be hard to keep the data host local.
>>
>> 2) "Allow the developer to code everything inside Flink".
>> - The Table API which you are referring to in your example is built on
>> top of the DataSet and DataStream APIs. I think it should be possible to
>> add another API similar to the Table API. You should be aware that the
>> Table API is currently quite actively developed and should not be
>> considered to be a stable interface. So certain things might change in the
>> next versions. With 1.0 we stabilized the DataSet API and I would rather
>> put a new API on top of it than on the Table API.
>> - Regarding the transformation in H2O structures and calling H2O
>> operations, I think this might again be done in MapPartition operators. In
>> general, MapPartition gives you a lot of freedom because it provides an
>> iterator over all elements of a partition. So you can do things before the
>> first and after the last element and group data as you like. You can use
>> partitionByHash() or rebalace() to shuffle data and sortPartition to
>> locally sort the data in a partition. Please note that MapPartition
>> operators do not support chaining and come therefore with a certain
>> serialization overhead. Whenever possible you should use a MapFunction or
>> FlatMapFunction which are a bit more lightweight.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> 2016-05-03 15:13 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io>:
>>
>>> I'm not sure this is the right way to do it but we were exploring all
>>> the possibilities and this one is the more obvious. We also spent some time
>>> to study how to do it to achieve a better understanding of Flink's
>>> internals.
>>>
>>> What we want to do though is to integrate Flink with another distributed
>>> system that builds its own nodes and coordinates through the network with
>>> its own logic. This software is H2O (a Machine Learning platform) and the
>>> integration consists of two big tasks: the first is to instantiate a H2O's
>>> node in every task manager and handle the lifecycle of the node according
>>> to the taskmanager and the execution graph. The second is to allow the
>>> developer to code everything inside Flink, converting from and to H2O's
>>> data structures (distributed tabular data) and triggering the execution of
>>> algorithms on H2O with a uniform API.
>>>
>>> Here's a simple example (assuming that we will use the TableAPI):
>>>
>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>> val h2oEnv = H2OEnviroment.getEnvironment(env)
>>>
>>> val myData: Table = ...
>>> val someOtherData: Table = ...
>>>
>>> val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)
>>>
>>> val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)
>>>
>>> val predictions:Table=linearRegressionModel(someOtherData)
>>>
>>> predictions.select(...)
>>>
>>>
>>> A good solution should allow the system to keep the H2O's nodes alive
>>> through multiple tasks and the possibility to move the data locally from
>>> Flink to H2O. The latter is not achieved in H2O's integration with Spark
>>> but we still hope to do it.
>>>
>>> That said, I'm still not sure if it is really required to implement a
>>> custom runtime operator but given the complexity of the integration of two
>>> distribute systems, we assumed that more control would allow more
>>> flexibility and possibilities to achieve an ideal solution.
>>>
>>>
>>>
>>>
>>>
>>> 2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>>
>>>> Hi Simone,
>>>>
>>>> you are right, the interfaces you extend are not considered to be
>>>> public, user-facing API.
>>>> Adding custom operators to the DataSet API touches many parts of the
>>>> system and is not straightforward.
>>>> The DataStream API has better support for custom operators.
>>>>
>>>> Can you explain what kind of operator you would like to add?
>>>> Maybe the functionality can be achieved with the existing operators.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-05-03 12:54 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io
>>>> >:
>>>>
>>>>> Hello Fabian,
>>>>>
>>>>> we delved more moving from the input you gave us but a question
>>>>> arised. We always assumed that runtime operators were open for extension
>>>>> without modifying anything inside Flink but it looks like this is not
the
>>>>> case and the documentation assumes that the developer is working to a
>>>>> contribution to Flink. So I would like to know if our understandment
is
>>>>> correct and custom runtime operators are not supposed to be implemented
>>>>> outside of Flink.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Simone
>>>>>
>>>>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>>>>
>>>>>> Hi Simone,
>>>>>>
>>>>>> the GraphCreatingVisitor transforms the common operator plan into
a
>>>>>> representation that is translated by the optimizer.
>>>>>> You have to implement an OptimizerNode and OperatorDescriptor to
>>>>>> describe the operator.
>>>>>> Depending on the semantics of the operator, there are a few more
>>>>>> places to make the integration working like driver strategies, cost
model,
>>>>>> etc.
>>>>>>
>>>>>> I would recommend to have a look at previous changes that added an
>>>>>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin,
etc.
>>>>>> The respective commits should give you an idea which parts of the
>>>>>> code need to be touched. You should find the commit IDs in the JIRA
issues
>>>>>> for these extensions.
>>>>>>
>>>>>> Cheers, Fabian
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2016-04-29 15:32 GMT+02:00 Simone Robutti <
>>>>>> simone.robutti@radicalbit.io>:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm trying to create a custom operator to explore the internals
of
>>>>>>> Flink. Actually the one I'm working on is rather similar to Union
and I'm
>>>>>>> trying to mimick it for now. When I run my job though, this error
arise:
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>>>>>> Unknown operator type: MyOperator - My Operator
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>>>>>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>>>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>>>>>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>>>>>> at
>>>>>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>>>>>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>>>>>>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>> at
>>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>>>>>
>>>>>>> I looked at the location of the error but it's not clear to me
how
>>>>>>> to make my operator recognizable from the optimizer.
>>>>>>>
>>>>>>> Thank,
>>>>>>>
>>>>>>> Simone
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message