flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Subject Re: How would I use OneInputStreamOperator to deal with data skew?
Date Tue, 16 Apr 2019 16:49:38 GMT
Hi Kurt,

How do you make the finishBundle
<https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/MapBundleFunction.java#L59>
method returns the combined tuples? I saw that there is a method
"List<String> getOutputs()" which is never called.

I did an implementation
<https://github.com/felipegutierrez/explore-flink/tree/master/src/main/java/org/sense/flink/examples/stream/operators>
based on the example that you suggested. The MapBundleFunctionImpl
<https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operators/MapBundleFunctionImpl.java#L53>
class
has the method finishBundle which iterate all the combined tuples and
return it. However, my application does not continue to receive tuples
after the transform method
<https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L86>
.

Thanks,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Apr 16, 2019 at 3:10 AM Kurt Young <ykt836@gmail.com> wrote:

> I think you can simply copy the source codes to your project if maven
> dependency can not be used.
>
> Best,
> Kurt
>
>
> On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> Hi again Kurt,
>>
>> could you please help me with the pom.xml file? I have included all Table
>> ecosystem dependencies and the flink-table-runtime-blink as well. However
>> the class org.apache.flink.table.runtime.context.ExecutionContext is still
>> not found. I guess I am missing some dependency, but I do not know which.
>> This is my pom.xml file.
>>
>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd">
>> <modelVersion>4.0.0</modelVersion>
>>
>> <groupId>org.sense.flink</groupId>
>> <artifactId>explore-flink</artifactId>
>> <version>0.0.1-SNAPSHOT</version>
>> <packaging>jar</packaging>
>>
>> <name>explore-flink</name>
>> <url>http://maven.apache.org</url>
>>
>> <properties>
>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>> <jdk.version>1.8</jdk.version>
>> <scala.binary.version>2.11</scala.binary.version>
>> <!-- <flink.version>1.8.0</flink.version> -->
>> <flink.version>1.9-SNAPSHOT</flink.version>
>> <junit.version>4.12</junit.version>
>> </properties>
>>
>> <dependencies>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-java</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-metrics-dropwizard</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>>
>> <!-- Table ecosystem -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-runtime-blink</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>org.fusesource.mqtt-client</groupId>
>> <artifactId>mqtt-client</artifactId>
>> <version>1.15</version>
>> <!-- <scope>provided</scope> -->
>> </dependency>
>>
>> <dependency>
>> <groupId>org.slf4j</groupId>
>> <artifactId>slf4j-api</artifactId>
>> <version>1.7.26</version>
>> </dependency>
>> <dependency>
>> <groupId>org.slf4j</groupId>
>> <artifactId>slf4j-log4j12</artifactId>
>> <version>1.7.26</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>junit</groupId>
>> <artifactId>junit</artifactId>
>> <version>${junit.version}</version>
>> </dependency>
>> </dependencies>
>> <build>
>> <finalName>explore-flink</finalName>
>> <plugins>
>> <!-- download source code in Eclipse, best practice -->
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-eclipse-plugin</artifactId>
>> <version>2.10</version>
>> <configuration>
>> <downloadSources>true</downloadSources>
>> <downloadJavadocs>false</downloadJavadocs>
>> </configuration>
>> </plugin>
>>
>> <!-- Set a compiler level -->
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-compiler-plugin</artifactId>
>> <version>3.8.0</version>
>> <configuration>
>> <source>${jdk.version}</source>
>> <target>${jdk.version}</target>
>> </configuration>
>> </plugin>
>>
>> <!-- Maven Shade Plugin -->
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-shade-plugin</artifactId>
>> <version>3.2.0</version>
>> <!-- Run shade goal on package phase -->
>> <executions>
>> <execution>
>> <phase>package</phase>
>> <goals>
>> <goal>shade</goal>
>> </goals>
>> <configuration>
>> <artifactSet>
>> <excludes>
>> <exclude>org.apache.flink:*</exclude>
>> <!-- Also exclude very big transitive dependencies of Flink WARNING:
>> You have to remove these excludes if your code relies on other versions
>> of
>> these dependencies. -->
>> <exclude>org.slf4j:*</exclude>
>> <exclude>log4j:*</exclude>
>> <exclude>com.typesafe:config:*</exclude>
>> <exclude>junit:junit:*</exclude>
>> <exclude>com.codahale.metrics:*</exclude>
>> </excludes>
>> </artifactSet>
>> <filters>
>> <filter>
>> <artifact>org.apache.flink:*</artifact>
>> <excludes>
>> <!-- exclude shaded google but include shaded curator -->
>> <exclude>org/apache/flink/shaded/com/**</exclude>
>> <exclude>web-docs/**</exclude>
>> </excludes>
>> </filter>
>> <filter>
>> <!-- Do not copy the signatures in the META-INF folder. Otherwise,
>> this might cause SecurityExceptions when using the JAR. -->
>> <artifact>*:*</artifact>
>> <excludes>
>> <exclude>META-INF/*.SF</exclude>
>> <exclude>META-INF/*.DSA</exclude>
>> <exclude>META-INF/*.RSA</exclude>
>> </excludes>
>> </filter>
>> <filter>
>> <artifact>*:*</artifact>
>> <includes>
>> <include>org/apache/calcite/**</include>
>> <include>org/apache/flink/calcite/shaded/**</include>
>> <include>org/apache/flink/table/**</include>
>> <include>org.codehaus.commons.compiler.properties</include>
>> <include>org/codehaus/janino/**</include>
>> <include>org/codehaus/commons/**</include>
>> </includes>
>> </filter>
>> </filters>
>> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
>> the following lines. This will add a Main-Class entry to the manifest
>> file -->
>> <transformers>
>> <transformer
>>
>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>> <mainClass>org.sense.flink.App</mainClass>
>> </transformer>
>> </transformers>
>> <createDependencyReducedPom>false</createDependencyReducedPom>
>> </configuration>
>> </execution>
>> </executions>
>> </plugin>
>> </plugins>
>> </build>
>> </project>
>>
>>
>> Thanks
>>
>>
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> oh, yes. I just saw. I will use 1.9 then. thanks
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt836@gmail.com> wrote:
>>>
>>>> It's because all blink codes are not shipped with 1.8.0, they current
>>>> only available in 1.9-SNAPSHOT.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <
>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> what are the artifacts that I have to import on maven in order to use
>>>>> Blink Api?
>>>>>
>>>>> I am using Flink 1.8.0 and I am trying to import blink code to use its
>>>>> ExecutionContext
>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>.
>>>>> I want to do this in order to implement my own operator like it is
>>>>> implemented here
>>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>.
>>>>> I guess if I import flink-table everything should come inside the same
jar
>>>>> as it is done here
>>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>.
>>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says that
it
>>>>> is a missing artifact.
>>>>>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-planner_2.11</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-common</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table</artifactId>
>>>>> <version>1.8.0</version>
>>>>> <type>pom</type>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>>> <version>1.8.0</version>
>>>>> </dependency>
>>>>>
>>>>>
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>>
>>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <
>>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>>
>>>>>> Cool, thanks Kurt!
>>>>>> *-*
>>>>>> *- Felipe Gutierrez*
>>>>>>
>>>>>> *- skype: felipe.o.gutierrez*
>>>>>> *- **https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>* *
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt836@gmail.com>
wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> You can checkout the bundle operator which used in Blink to perform
>>>>>>> similar thing you mentioned:
>>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <
>>>>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I was trying to implement a better way to handle data skew
using
>>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff
>>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming
>>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says
that they
>>>>>>>> used OneInputStreamOperator [2]. Through it, they could implement
the
>>>>>>>> "combiner" in Hadoop (execute part of the reduce tasks on
the Map phase,
>>>>>>>> before shuffling).
>>>>>>>>
>>>>>>>> I need some help here. What are some of the Flink source-code
>>>>>>>> operators that I can peek up to implement my on operator
that deals with
>>>>>>>> data skew? Or maybe, is there someone that have an example
of a use case
>>>>>>>> similar to this?
>>>>>>>>
>>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Felipe
>>>>>>>>
>>>>>>>> *--*
>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>
>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>
>>>>>>>

Mime
View raw message