Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 62F1D200A01 for ; Tue, 3 May 2016 15:13:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 623C21609F4; Tue, 3 May 2016 15:13:10 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6BCDB1609F3 for ; Tue, 3 May 2016 15:13:09 +0200 (CEST) Received: (qmail 65531 invoked by uid 500); 3 May 2016 13:13:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 65520 invoked by uid 99); 3 May 2016 13:13:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2016 13:13:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E05F1C37BF for ; Tue, 3 May 2016 13:13:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=radicalbit-io.20150623.gappssmtp.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 4Z9TbP4iYr7x for ; Tue, 3 May 2016 13:13:06 +0000 (UTC) Received: from mail-lf0-f46.google.com (mail-lf0-f46.google.com [209.85.215.46]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 562AD5F476 for ; Tue, 3 May 2016 13:13:05 +0000 (UTC) Received: by mail-lf0-f46.google.com with SMTP id m64so21238220lfd.1 for ; Tue, 03 May 2016 06:13:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=radicalbit-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to; bh=dQE+hCky/j8z3qkvm5xeI2PAUhpqfYBPGOiXlR+Kmi0=; b=Yg4jeauWl29yJjGV5HEf5ZEcA4hnzOH5XVPkSh5AFrkiGr5IQrUQHNsU2Ll3ePbfJf o2DAeCFba47hR5DzbPEgiAxqvBP/q09UwzTN3gyXTvQknnKggenU2GBuAJZGoDg4bX74 JbVQmOpBn4XIJwXT8FUnpL0D4Fo1RobWDqgarnsGpOYtMT7Qgct8EFjfNlid88STjhBv bCzfHVJH0m5rFdPJD3QQN2pvdk4PLlyLfHw4ot7zg9DNOVcmSk5NFDSDuyZQU1PAmHAH H+hkJG7EE5uXbEjfz7/SHHuluJpLl6I5/qFdTS2zSlgvlpS41Fej81Jqd9/nbD8+Aa+r /2nA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=dQE+hCky/j8z3qkvm5xeI2PAUhpqfYBPGOiXlR+Kmi0=; b=Qwh7G6OEhGw0FVYzx8Y80IEnnaKKSmbp3KBKyELZ87MvwQ9851iyvIENkOCNvzkmZ8 QDEhx3PpRkWQ6lXVmLa/tlTia5rqNv2JT6SURRrYR0kn+AbCSvWea+ti5sIqpiw/WNkg i8WRXZZeq25fcDFmoSSuTjeNUJw8QYP2LB20FEzLOfSeWxF3v1wmkseJDCp0b/gPYXgw sDk79UasUjkNk0ciYp7kbs0i3Mg58zu3vD7KT7AIoYlEFfhKndI6mGBlvewQ/wDntCXv N1ERVPC5lqyrxyrzGM+/r9awsaXWJthhLV7fbb9giQaHq9XqWK2XndJzAEhqX8W438Bw 3Hcg== X-Gm-Message-State: AOPr4FX2w7pcnAqyeW1vhGdoZ4a2GVijlwqVq004w76HL7jrjxfH7K+YMzq+i8BWg9VyeZoVb38qpq3Nr01xRNyK MIME-Version: 1.0 X-Received: by 10.25.211.75 with SMTP id k72mr1170379lfg.45.1462281183366; Tue, 03 May 2016 06:13:03 -0700 (PDT) Received: by 10.25.15.209 with HTTP; Tue, 3 May 2016 06:13:03 -0700 (PDT) In-Reply-To: References: Date: Tue, 3 May 2016 15:13:03 +0200 Message-ID: Subject: Re: Creating a custom operator From: Simone Robutti To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1141176a7f2a7e0531efdef0 archived-at: Tue, 03 May 2016 13:13:10 -0000 --001a1141176a7f2a7e0531efdef0 Content-Type: text/plain; charset=UTF-8 I'm not sure this is the right way to do it but we were exploring all the possibilities and this one is the more obvious. We also spent some time to study how to do it to achieve a better understanding of Flink's internals. What we want to do though is to integrate Flink with another distributed system that builds its own nodes and coordinates through the network with its own logic. This software is H2O (a Machine Learning platform) and the integration consists of two big tasks: the first is to instantiate a H2O's node in every task manager and handle the lifecycle of the node according to the taskmanager and the execution graph. The second is to allow the developer to code everything inside Flink, converting from and to H2O's data structures (distributed tabular data) and triggering the execution of algorithms on H2O with a uniform API. Here's a simple example (assuming that we will use the TableAPI): val env = ExecutionEnvironment.getExecutionEnvironment val h2oEnv = H2OEnviroment.getEnvironment(env) val myData: Table = ... val someOtherData: Table = ... val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv) val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame) val predictions:Table=linearRegressionModel(someOtherData) predictions.select(...) A good solution should allow the system to keep the H2O's nodes alive through multiple tasks and the possibility to move the data locally from Flink to H2O. The latter is not achieved in H2O's integration with Spark but we still hope to do it. That said, I'm still not sure if it is really required to implement a custom runtime operator but given the complexity of the integration of two distribute systems, we assumed that more control would allow more flexibility and possibilities to achieve an ideal solution. 2016-05-03 13:29 GMT+02:00 Fabian Hueske : > Hi Simone, > > you are right, the interfaces you extend are not considered to be public, > user-facing API. > Adding custom operators to the DataSet API touches many parts of the > system and is not straightforward. > The DataStream API has better support for custom operators. > > Can you explain what kind of operator you would like to add? > Maybe the functionality can be achieved with the existing operators. > > Best, Fabian > > 2016-05-03 12:54 GMT+02:00 Simone Robutti : > >> Hello Fabian, >> >> we delved more moving from the input you gave us but a question arised. >> We always assumed that runtime operators were open for extension without >> modifying anything inside Flink but it looks like this is not the case and >> the documentation assumes that the developer is working to a contribution >> to Flink. So I would like to know if our understandment is correct and >> custom runtime operators are not supposed to be implemented outside of >> Flink. >> >> Thanks, >> >> Simone >> >> 2016-04-29 21:32 GMT+02:00 Fabian Hueske : >> >>> Hi Simone, >>> >>> the GraphCreatingVisitor transforms the common operator plan into a >>> representation that is translated by the optimizer. >>> You have to implement an OptimizerNode and OperatorDescriptor to >>> describe the operator. >>> Depending on the semantics of the operator, there are a few more places >>> to make the integration working like driver strategies, cost model, etc. >>> >>> I would recommend to have a look at previous changes that added an >>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc. >>> The respective commits should give you an idea which parts of the code >>> need to be touched. You should find the commit IDs in the JIRA issues for >>> these extensions. >>> >>> Cheers, Fabian >>> >>> >>> >>> >>> >>> 2016-04-29 15:32 GMT+02:00 Simone Robutti >>> : >>> >>>> Hello, >>>> >>>> I'm trying to create a custom operator to explore the internals of >>>> Flink. Actually the one I'm working on is rather similar to Union and I'm >>>> trying to mimick it for now. When I run my job though, this error arise: >>>> >>>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown >>>> operator type: MyOperator - My Operator >>>> at >>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237) >>>> at >>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82) >>>> at >>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279) >>>> at >>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223) >>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348) >>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454) >>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) >>>> at >>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213) >>>> at >>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107) >>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50) >>>> at io.radicalbit.flinkh2o.Job.main(Job.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >>>> >>>> I looked at the location of the error but it's not clear to me how to >>>> make my operator recognizable from the optimizer. >>>> >>>> Thank, >>>> >>>> Simone >>>> >>> >>> >> > --001a1141176a7f2a7e0531efdef0 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I'm not sure this is the right way to do it but we wer= e exploring all the possibilities and this one is the more obvious. We also= spent some time to study how to do it to achieve a better understanding of= Flink's internals.

What we want to do though is to = integrate Flink with another distributed system that builds its own nodes a= nd coordinates through the network with its own logic. This software is H2O= (a Machine Learning platform) and the integration consists of two big task= s: the first is to instantiate a H2O's node in every task manager and h= andle the lifecycle of the node according to the taskmanager and the execut= ion graph. The second is to allow the developer to code everything inside F= link, converting from and to H2O's data structures (distributed tabular= data) and triggering the execution of algorithms on H2O with a uniform API= .

Here's a simple example (assuming that we wi= ll use the TableAPI):

val env =3D=C2=A0ExecutionEn= vironment.getExecutionEnvironment
val h2oEnv =3D H2OEnviroment.ge= tEnvironment(env)

val myData: Table =3D ...
<= div>val someOtherData: Table =3D ...

val myH2OFram= e =3D myData.select(...).toH2OFrame(h2oEnv)

val li= nearRegressionModel =3D h2oEnv.linearRegression(myH2OFrame)

<= /div>
val predictions:Table=3DlinearRegressionModel(someOtherData)

predictions.select(...)


<= /div>
A good solution should allow the system to keep the H2O's nod= es alive through multiple tasks and the possibility to move the data locall= y from Flink to H2O. The latter is not achieved in H2O's integration wi= th Spark but we still hope to do it.

That said, I&= #39;m still not sure if it is really required to implement a custom runtime= operator but given the complexity of the integration of two distribute sys= tems, we assumed that more control would allow more flexibility and possibi= lities to achieve an ideal solution.





2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhueske@gmail.c= om>:
=
Hi Simone,

you are right, the interfaces you ex= tend are not considered to be public, user-facing API.
Adding custom op= erators to the DataSet API touches many parts of the system and is not stra= ightforward.
The DataStream API has better support for custom oper= ators.

Can you explain what kind of operator you would like to= add?
Maybe the functionality can be achieved with the existing operato= rs.

Best, Fabian

2016-05-03 12= :54 GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io&= gt;:
Hello Fabian= ,

we delved more moving from the input you gave us but a= question arised. We always assumed that runtime operators were open for ex= tension without modifying anything inside Flink but it looks like this is n= ot the case and the documentation assumes that the developer is working to = a contribution to Flink. So I would like to know if our understandment is c= orrect and custom runtime operators are not supposed to be implemented outs= ide of Flink.

Thanks,

Sim= one

2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhueske@gmail.com>= ;:
Hi Simone,

the GraphCreatingVisitor transforms the = common operator plan into a representation that is translated by the optimi= zer.
You have to implement an OptimizerNode and OperatorDescriptor to de= scribe the operator.
Depending on the semantics of the operat= or, there are a few more places to make the integration working like driver= strategies, cost model, etc.

I would recommend to have a= look at previous changes that added an operator such as PartitionOperator,= SortPartitionOperator, OuterJoin, etc.
The respective commit= s should give you an idea which parts of the code need to be touched. You s= hould find the commit IDs in the JIRA issues for these extensions.

<= /div>
Cheers, Fabian




=

2016-04-29 15:32 = GMT+02:00 Simone Robutti <simone.robutti@radicalbit.io><= /span>:
Hello,

<= /div>
I'm trying to create a custom operator to explore the interna= ls of Flink. Actually the one I'm working on is rather similar to Union= and I'm trying to mimick it for now. When I run my job though, this er= ror arise:

Exception in thread "main&quo= t; java.lang.IllegalArgumentException: Unknown operator type: MyOperator - = My Operator
at org.a= pache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatin= gVisitor.java:237)
a= t org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(Graph= CreatingVisitor.java:82)
at org.apache.flink.api.common.operators.DualInputOperator.accept(Dual= InputOperator.java:279)
at org.apache.flink.api.common.operators.GenericDataSinkBase.accept(Gen= ericDataSinkBase.java:223)
= at org.apache.flink.api.common.Plan.accept(Plan.java:348)
= at org.apache.flink.optimizer.= Optimizer.compile(Optimizer.java:454)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer= .java:398)
at org.ap= ache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:2= 13)
at org.apache.fl= ink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
at io.radicalbit.flin= kh2o.Job$.main(Job.scala:50)
at io.radicalbit.flinkh2o.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.= invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.= java:57)
at sun.refl= ect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:4= 3)
at java.lang.refl= ect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMai= n.java:140)

I looked at the location of the = error but it's not clear to me how to make my operator recognizable fro= m the optimizer.=C2=A0

Thank,

=
Simone




--001a1141176a7f2a7e0531efdef0--