flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "huangwei (G)" <huangwei...@huawei.com>
Subject 答复: Some problems about Flink applications
Date Tue, 11 Aug 2015 12:47:34 GMT
Hi Stephan and Matthias,

Sorry for replying late.
I`ve double checked that this class StormSpoutWrapper is really exist in my jar file.
And it got the same trouble when I ran the flink-storm-compatibililty-example- corresponding
word-count-storm.
The way I built my Throughput application was just adding it into the flink-storm-compatibililty-example
and change some configurations in the word-count-storm.xml.
Here is the entire POM file.


<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-storm-compatibility-parent</artifactId>
		<version>0.10-SNAPSHOT</version>
		<relativePath>..</relativePath>
	</parent>

	<artifactId>flink-storm-compatibility-examples</artifactId>
	<name>flink-storm-compatibility-examples</name>

	<packaging>jar</packaging>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-storm-compatibility-core</artifactId>
			<version>${project.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java-examples</artifactId>
			<version>${project.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-core</artifactId>
			<version>${project.version}</version>
			<scope>test</scope>
			<type>test-jar</type>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-test-utils</artifactId>
			<version>${project.version}</version>
			<scope>test</scope>
		</dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.4</version>
            <!-- keep storm out of the jar-with-dependencies -->
            <scope>provided</scope>
        </dependency>
	</dependencies>

	<build>
		<plugins>
			<!-- get default data from flink-java-examples package -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-dependency-plugin</artifactId>
				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
				<executions>
					<execution>
						<id>unpack</id>
						<phase>prepare-package</phase>
						<goals>
							<goal>unpack</goal>
						</goals>
						<configuration>
							<artifactItems>
								<artifactItem>
									<groupId>org.apache.flink</groupId>
									<artifactId>flink-java-examples</artifactId>
									<version>${project.version}</version>
									<type>jar</type>
									<overWrite>false</overWrite>
									<outputDirectory>${project.build.directory}/classes</outputDirectory>
									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
								</artifactItem>
								<artifactItem>
									<groupId>org.apache.flink</groupId>
									<artifactId>flink-storm-compatibility-core</artifactId>
									<version>${project.version}</version>
									<type>jar</type>
									<overWrite>false</overWrite>
									<outputDirectory>${project.build.directory}/classes</outputDirectory>
								</artifactItem>
								<artifactItem>
									<groupId>org.apache.storm</groupId>
									<artifactId>storm-core</artifactId>
									<version>0.9.4</version>
									<type>jar</type>
									<overWrite>false</overWrite>
									<outputDirectory>${project.build.directory}/classes</outputDirectory>
									<!--<excludes>defaults.yaml</excludes>-->
								</artifactItem>
								<artifactItem>
									<groupId>com.googlecode.json-simple</groupId>
									<artifactId>json-simple</artifactId>
									<version>1.1</version>
									<type>jar</type>
									<overWrite>false</overWrite>
									<outputDirectory>${project.build.directory}/classes</outputDirectory>
								</artifactItem>snakeyaml
								<artifactItem>
									<groupId>org.yaml</groupId>
									<artifactId>snakeyaml</artifactId>
									<version>1.11</version>
									<type>jar</type>
									<overWrite>false</overWrite>
									<outputDirectory>${project.build.directory}/classes</outputDirectory>
								</artifactItem>
							</artifactItems>
						</configuration>
					</execution>
				</executions>
			</plugin>

			<!-- self-contained jars for each example -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>

				<executions>

					<!-- WordCount Spout source-->
					<execution>
						<id>WordCount-SpoutSource</id>
						<phase>package</phase>
						<goals>
							<goal>jar</goal>
						</goals>
						<configuration>
							<classifier>WordCountSpoutSource</classifier>

							<archive>
								<manifestEntries>
									<program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSourceWordCount</program-class>
								</manifestEntries>
							</archive>

							<includes>
								<!-- from storm-core -->
								<include>backtype/storm/topology/*.class</include>
								<include>backtype/storm/spout/*.class</include>
								<include>backtype/storm/task/*.class</include>
								<include>backtype/storm/tuple/*.class</include>
								<include>backtype/storm/generated/*.class</include>
								<include>backtype/storm/metric/**/*.class</include>
								<include>org/apache/thrift7/**/*.class</include>
								<!-- Storm's recursive dependencies -->
								<include>org/json/simple/**/*.class</include>
								<!-- compatibility layer -->
								<include>org/apache/flink/stormcompatibility/api/*.class</include>
								<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
								<!-- Word Count -->
								<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include>
								<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include>
								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.class</include>
								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.class</include>
								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.class</include>
								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
							</includes>
						</configuration>
					</execution>

					<!-- WordCount Bolt tokenizer-->
					<execution>
						<id>WordCount-BoltTokenizer</id>
						<phase>package</phase>
						<goals>
							<goal>jar</goal>
						</goals>
						<configuration>
							<classifier>WordCountBoltTokenizer</classifier>

							<archive>
								<manifestEntries>
									<program-class>org.apache.flink.stormcompatibility.wordcount.BoltTokenizerWordCount</program-class>
								</manifestEntries>
							</archive>

							<includes>
								<!-- from storm-core -->
								<include>backtype/storm/topology/*.class</include>
								<include>backtype/storm/spout/*.class</include>
								<include>backtype/storm/task/*.class</include>
								<include>backtype/storm/tuple/*.class</include>
								<include>backtype/storm/generated/*.class</include>
								<include>backtype/storm/metric/**/*.class</include>
								<include>org/apache/thrift7/**/*.class</include>
								<!-- Storm's recursive dependencies -->
								<include>org/json/simple/**/*.class</include>
								<!-- compatibility layer -->
								<include>org/apache/flink/stormcompatibility/api/*.class</include>
								<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
								<!-- Word Count -->
								<include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.class</include>
								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.class</include>
								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
							</includes>
						</configuration>
					</execution>

                    <!-- Throughput -->
                    <execution>
                        <id>Throughput</id>
                        <phase>package</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                        <configuration>
                            <classifier>Throughput</classifier>

                            <archive>
                                <manifestEntries>
                                    <program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class>
                                </manifestEntries>
                            </archive>

                            <includes>
                                <!-- from storm-core -->
                                <include>defaults.yaml</include>
                                <include>backtype/storm/*.class</include>
                                <include>backtype/storm/serialization/*.class</include>
                                <include>backtype/storm/topology/*.class</include>
                                <include>backtype/storm/topology/base/*.class</include>
                                <include>backtype/storm/utils/*.class</include>
                                <include>backtype/storm/spout/*.class</include>
                                <include>backtype/storm/task/*.class</include>
                                <include>backtype/storm/tuple/*.class</include>
                                <include>backtype/storm/generated/*.class</include>
                                <include>backtype/storm/metric/**/*.class</include>
                                <include>org/apache/storm/curator/*.class</include>
                                <include>org/apache/thrift7/**/*.class</include>
                                <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
                                <include>org/yaml/snakeyaml/**/*.class</include>
                                <!-- Storm's recursive dependencies -->
                                <include>org/json/simple/**/*.class</include>
                                <!-- compatibility layer -->
                                <include>org/apache/flink/stormcompatibility/api/*.class</include>
                                <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
                                <!-- Word Count -->
                                <include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include>
                                <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include>
                            </includes>
                        </configuration>
                    </execution>

					<execution>
						<goals>
							<goal>test-jar</goal>
						</goals>
					</execution>
				</executions>
			</plugin>

			<!-- WordCount Storm topology-->
			<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<descriptors>
						<descriptor>src/assembly/word-count-storm.xml</descriptor>
					</descriptors>
					<archive>
						<manifestEntries>
							<program-class>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</program-class>
						</manifestEntries>
					</archive>
				</configuration>

				<executions>
					<execution>
						<id>WordCountStorm</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>

		<pluginManagement>
			<plugins>
				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has
no influence on the Maven build itself.-->
				<plugin>
					<groupId>org.eclipse.m2e</groupId>
					<artifactId>lifecycle-mapping</artifactId>
					<version>1.0.0</version>
					<configuration>
						<lifecycleMappingMetadata>
							<pluginExecutions>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-dependency-plugin</artifactId>
										<versionRange>[2.9,)</versionRange>
										<goals>
											<goal>unpack</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
							</pluginExecutions>
						</lifecycleMappingMetadata>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>

	</build>

</project>

-----邮件原件-----
发件人: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] 代表 Stephan Ewen
发送时间: 2015年8月11日 1:55
收件人: dev@flink.apache.org
主题: Re: Some problems about Flink applications

It would also help if you could paste the entire POM file of your project.
Maybe something is amiss with the dependencies, or the scopes?

On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax < mjsax@informatik.hu-berlin.de> wrote:

> Hi Huang,
>
> about Storm compatibility. Did you double check, that the file that is 
> missing (StormSpoutWrapper) is contained in your jar. Looking at 
> pom.xml does not help here, because if you specify to include a file, 
> but maven cannot find it, it will just not add it to the jar, but 
> build will succeed. Thus, you need to check the jar file itself via command line:
>     unzip -l myjarfile.jar
> or
>     unzip -l myjarfile.jar | grep file-I-am-looking-for
>
> I guess your jar is not build correctly, ie, the file is not there...
>
> Did you have a look into pom.xml for 
> flink-storm-compatibililty-example
> and the corresponding word-count-storm.xml? This shows how to build a 
> jar correctly (it was recently fixed, so make sure you update to the 
> latest master)
>
> You can also have a look here how to package jars correctly (even if 
> this example is about Flink ML):
>
> https://stackoverflow.com/questions/31661900/maven-build-has-missing-p
> ackage-linkages/31662642#31662642
>
> -Matthias
>
> On 08/08/2015 11:15 AM, huangwei (G) wrote:
> > Hi,
> > I get some trouble in developing Flink applications.
> >
> > 1.
> > I want to test the performance between Storm and
> flink-storm-compatibility using the test program:
> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src
> /jvm/experiments/Throughput.java
> .
> > And there is a bit of my changes with this Throughput.java below:
> >
> >
> >
> > public static void main(String[] args) throws Exception {
> >                    ParameterTool pt = ParameterTool.fromArgs(args);
> >
> >                    int par = pt.getInt("para");
> >
> >                    final FlinkTopologyBuilder builder = new
> FlinkTopologyBuilder();
> >
> >                    builder.setSpout("source0", new Generator(pt),
> pt.getInt("sourceParallelism"));
> >
> >                    int i = 0;
> >                    for (; i < pt.getInt("repartitions", 1) - 1; i++) {
> >                             System.out.println("adding source" + i + "
> --> source" + (i + 1));
> >                             builder.setBolt("source" + (i + 1), new
> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
> >                                                .fieldsGrouping("source"
> + i, new Fields("id"));
> >                    }
> >
> >                    System.out.println("adding final source" + i + " 
> > -->
> sink");
> >
> >                    builder.setBolt("sink", new Sink(pt),
> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new 
> Fields("id"));
> >
> >                    Config conf = new Config();
> >                    conf.setDebug(false); //System.exit(1);
> >
> >                    // execute program locally
> >                    final FlinkLocalCluster cluster =
> FlinkLocalCluster.getLocalCluster();
> >                    cluster.submitTopology("throughput", null,
> builder.createTopology());
> >
> >                    Utils.sleep(10 * 1000);
> >
> >                    // TODO kill does no do anything so far
> >                    cluster.killTopology("throughput");
> >                    cluster.shutdown();
> >          }
> >
> >
> > This program will run well in IDEA with flink-storm-compatibility.
> > However, when I packaged it into a jar file and run on the
> flink-0.10SNAPSHOT there is a problem in flink-client log file:
> >
> > java.lang.Exception: Call to registerInputOutput() of invokable failed
> >          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
> >          at java.lang.Thread.run(Thread.java:745)
> > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> >          at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(St
> reamConfig.java:187)
> >          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutpu
> t(StreamTask.java:90)
> >          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
> >          ... 1 more
> > Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
> >          at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >          at java.lang.Class.forName0(Native Method)
> >          at java.lang.Class.forName(Class.java:348)
> >          at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.r
> esolveClass(InstantiationUtil.java:71)
> >          at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613
> )
> >          at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> >          at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 74)
> >          at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >          at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 3)
> >          at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> >          at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:18
> 01)
> >          at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >          at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> >          at
> org.apache.flink.util.InstantiationUtil.deserializeObject(Instantiatio
> nUtil.java:302)
> >          at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(Instantia
> tionUtil.java:264)
> >          at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(St
> reamConfig.java:185)
> >          ... 3 more
> >
> >
> > And this class(StormSpoutWrapper) was exist in my packaged jar file.
> > As you can see part of my pom.xml:
> >
> > <!-- Throughput -->
> >                     <execution>
> >                         <id>Throughput</id>
> >                         <phase>package</phase>
> >                         <goals>
> >                             <goal>jar</goal>
> >                         </goals>
> >                         <configuration>
> >                             <classifier>Throughput</classifier>
> >
> >                             <archive>
> >                                 <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.experiments.Through
> put</program-class>
> >                                 </manifestEntries>
> >                             </archive>
> >
> >                             <includes>
> >                                 <!-- from storm-core -->
> >                                 <include>defaults.yaml</include>
> >                                 
> > <include>backtype/storm/*.class</include>
> >
>  <include>backtype/storm/serialization/*.class</include>
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/topology/base/*.class</include>
> >
>  <include>backtype/storm/utils/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/storm/curator/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >
>  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >
>  <include>org/yaml/snakeyaml/**/*.class</include>
> >                                 <!-- Storm's recursive dependencies 
> > -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                 <!-- compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/wrappers/*.class</include
> >
> >                                 <!-- Word Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput.cl
> ass</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.
> class</include>
> >                             </includes>
> >                         </configuration>
> >                     </execution>
> >
> > So how can I fix it?
> >
> >
> > 2.
> > There is a case following using operator join:
> >
> > DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
> sourceUserFunction());
> > DataStream<Tuple2<String, Integer>> area = env.addSource(new
> sourceAreaFunction());
> >
> > DataStream<Tuple2<String, Integer>> sink = user
> >       .join(area)
> >       .onWindow(15, TimeUnit.MINUTES)
> >       .where(0)
> >       .equalTo(0)
> >       .with(new JoinFunction<Tuple3<String, Integer, Long>,
> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> >          @Override
> >          public Tuple2<String, Integer> join(Tuple3<String, Integer,
> Long> first, Tuple2<String, Integer> second) throws Exception {
> >             if (first.f1 + second.f1 > 10){
> >                return new Tuple2<String, Integer>(first.f0, first.f1 
> > +
> second.f1);
> >             }
> >             return null;
> >          }
> >       });
> >
> > As you see, I don`t want to return null when the condition is not
> satisfied.
> > But there is not any JoinFunction with Collector.
> > I found a FlatJoinFunction which allows the Collector.
> > However, the FlatJoinFunction seem only can be used in DataSet 
> > instead
> DataStream.
> > Is there any other way to improve this case?
> >
> > PS. I`m sorry about this email. You may ignore me during the weekend.
> >
> > Greetings,
> > Huang Wei
> > 华为技术有限公司 Huawei Technologies Co., Ltd.
> >
> >
> > Tel:+86 18106512602
> > Email:huangwei111@huawei.com
> >
>
>
Mime
View raw message