flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kurt Young <ykt...@gmail.com>
Subject Re: How would I use OneInputStreamOperator to deal with data skew?
Date Tue, 16 Apr 2019 01:10:00 GMT
I think you can simply copy the source codes to your project if maven
dependency can not be used.

Best,
Kurt


On Mon, Apr 15, 2019 at 9:47 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi again Kurt,
>
> could you please help me with the pom.xml file? I have included all Table
> ecosystem dependencies and the flink-table-runtime-blink as well. However
> the class org.apache.flink.table.runtime.context.ExecutionContext is still
> not found. I guess I am missing some dependency, but I do not know which.
> This is my pom.xml file.
>
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <modelVersion>4.0.0</modelVersion>
>
> <groupId>org.sense.flink</groupId>
> <artifactId>explore-flink</artifactId>
> <version>0.0.1-SNAPSHOT</version>
> <packaging>jar</packaging>
>
> <name>explore-flink</name>
> <url>http://maven.apache.org</url>
>
> <properties>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> <jdk.version>1.8</jdk.version>
> <scala.binary.version>2.11</scala.binary.version>
> <!-- <flink.version>1.8.0</flink.version> -->
> <flink.version>1.9-SNAPSHOT</flink.version>
> <junit.version>4.12</junit.version>
> </properties>
>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-metrics-dropwizard</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
>
> <!-- Table ecosystem -->
> <dependency>
> <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-runtime-blink</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
> <dependency>
> <groupId>org.fusesource.mqtt-client</groupId>
> <artifactId>mqtt-client</artifactId>
> <version>1.15</version>
> <!-- <scope>provided</scope> -->
> </dependency>
>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-api</artifactId>
> <version>1.7.26</version>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> <version>1.7.26</version>
> </dependency>
>
> <dependency>
> <groupId>junit</groupId>
> <artifactId>junit</artifactId>
> <version>${junit.version}</version>
> </dependency>
> </dependencies>
> <build>
> <finalName>explore-flink</finalName>
> <plugins>
> <!-- download source code in Eclipse, best practice -->
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-eclipse-plugin</artifactId>
> <version>2.10</version>
> <configuration>
> <downloadSources>true</downloadSources>
> <downloadJavadocs>false</downloadJavadocs>
> </configuration>
> </plugin>
>
> <!-- Set a compiler level -->
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-compiler-plugin</artifactId>
> <version>3.8.0</version>
> <configuration>
> <source>${jdk.version}</source>
> <target>${jdk.version}</target>
> </configuration>
> </plugin>
>
> <!-- Maven Shade Plugin -->
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-shade-plugin</artifactId>
> <version>3.2.0</version>
> <!-- Run shade goal on package phase -->
> <executions>
> <execution>
> <phase>package</phase>
> <goals>
> <goal>shade</goal>
> </goals>
> <configuration>
> <artifactSet>
> <excludes>
> <exclude>org.apache.flink:*</exclude>
> <!-- Also exclude very big transitive dependencies of Flink WARNING:
> You have to remove these excludes if your code relies on other versions of
> these dependencies. -->
> <exclude>org.slf4j:*</exclude>
> <exclude>log4j:*</exclude>
> <exclude>com.typesafe:config:*</exclude>
> <exclude>junit:junit:*</exclude>
> <exclude>com.codahale.metrics:*</exclude>
> </excludes>
> </artifactSet>
> <filters>
> <filter>
> <artifact>org.apache.flink:*</artifact>
> <excludes>
> <!-- exclude shaded google but include shaded curator -->
> <exclude>org/apache/flink/shaded/com/**</exclude>
> <exclude>web-docs/**</exclude>
> </excludes>
> </filter>
> <filter>
> <!-- Do not copy the signatures in the META-INF folder. Otherwise,
> this might cause SecurityExceptions when using the JAR. -->
> <artifact>*:*</artifact>
> <excludes>
> <exclude>META-INF/*.SF</exclude>
> <exclude>META-INF/*.DSA</exclude>
> <exclude>META-INF/*.RSA</exclude>
> </excludes>
> </filter>
> <filter>
> <artifact>*:*</artifact>
> <includes>
> <include>org/apache/calcite/**</include>
> <include>org/apache/flink/calcite/shaded/**</include>
> <include>org/apache/flink/table/**</include>
> <include>org.codehaus.commons.compiler.properties</include>
> <include>org/codehaus/janino/**</include>
> <include>org/codehaus/commons/**</include>
> </includes>
> </filter>
> </filters>
> <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
> the following lines. This will add a Main-Class entry to the manifest file
> -->
> <transformers>
> <transformer
>
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> <mainClass>org.sense.flink.App</mainClass>
> </transformer>
> </transformers>
> <createDependencyReducedPom>false</createDependencyReducedPom>
> </configuration>
> </execution>
> </executions>
> </plugin>
> </plugins>
> </build>
> </project>
>
>
> Thanks
>
>
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Apr 15, 2019 at 3:25 PM Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> oh, yes. I just saw. I will use 1.9 then. thanks
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Mon, Apr 15, 2019 at 3:23 PM Kurt Young <ykt836@gmail.com> wrote:
>>
>>> It's because all blink codes are not shipped with 1.8.0, they current
>>> only available in 1.9-SNAPSHOT.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Mon, Apr 15, 2019 at 7:18 PM Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> what are the artifacts that I have to import on maven in order to use
>>>> Blink Api?
>>>>
>>>> I am using Flink 1.8.0 and I am trying to import blink code to use its
>>>> ExecutionContext
>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/context/ExecutionContext.java>.
>>>> I want to do this in order to implement my own operator like it is
>>>> implemented here
>>>> <https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java>.
>>>> I guess if I import flink-table everything should come inside the same jar
>>>> as it is done here
>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/pom.xml>.
>>>> However, I cannot import "flink-table-runtime-blink". Eclipse says that it
>>>> is a missing artifact.
>>>>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-planner_2.11</artifactId>
>>>> <version>1.8.0</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>>> <version>1.8.0</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>>> <version>1.8.0</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-common</artifactId>
>>>> <version>1.8.0</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table</artifactId>
>>>> <version>1.8.0</version>
>>>> <type>pom</type>
>>>> <scope>provided</scope>
>>>> </dependency>
>>>> <dependency> <!-- THIS IS NOT POSSIBLE TO IMPORT -->
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-runtime-blink</artifactId>
>>>> <version>1.8.0</version>
>>>> </dependency>
>>>>
>>>>
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Mon, Apr 15, 2019 at 9:49 AM Felipe Gutierrez <
>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>
>>>>> Cool, thanks Kurt!
>>>>> *-*
>>>>> *- Felipe Gutierrez*
>>>>>
>>>>> *- skype: felipe.o.gutierrez*
>>>>> *- **https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>* *
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>>
>>>>> On Mon, Apr 15, 2019 at 6:06 AM Kurt Young <ykt836@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> You can checkout the bundle operator which used in Blink to perform
>>>>>> similar thing you mentioned:
>>>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/bundle/BundleOperator.java
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 12, 2019 at 8:05 PM Felipe Gutierrez <
>>>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I was trying to implement a better way to handle data skew using
>>>>>>> Flink and I found this talk from #FlinkForward SF 2017: "Cliff
>>>>>>> Resnick & Seth Wiesman - From Zero to Streaming
>>>>>>> <https://youtu.be/mSLesPzWplA?t=835>" [1] which says that
they used
>>>>>>> OneInputStreamOperator [2]. Through it, they could implement
the "combiner"
>>>>>>> in Hadoop (execute part of the reduce tasks on the Map phase,
before
>>>>>>> shuffling).
>>>>>>>
>>>>>>> I need some help here. What are some of the Flink source-code
>>>>>>> operators that I can peek up to implement my on operator that
deals with
>>>>>>> data skew? Or maybe, is there someone that have an example of
a use case
>>>>>>> similar to this?
>>>>>>>
>>>>>>> [1] https://youtu.be/mSLesPzWplA?t=835
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.html
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Felipe
>>>>>>>
>>>>>>> *--*
>>>>>>> *-- Felipe Gutierrez*
>>>>>>>
>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>
>>>>>>

Mime
View raw message