Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 181051836B for ; Sat, 8 Aug 2015 17:27:43 +0000 (UTC) Received: (qmail 66089 invoked by uid 500); 8 Aug 2015 17:27:38 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 66021 invoked by uid 500); 8 Aug 2015 17:27:38 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 66003 invoked by uid 99); 8 Aug 2015 17:27:37 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Aug 2015 17:27:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 474B0C1263 for ; Sat, 8 Aug 2015 17:27:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.009 X-Spam-Level: X-Spam-Status: No, score=-0.009 tagged_above=-999 required=6.31 tests=[T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id yr3zjwq6eolo for ; Sat, 8 Aug 2015 17:27:25 +0000 (UTC) Received: from mailout1.informatik.hu-berlin.de (mailout1.informatik.hu-berlin.de [141.20.20.101]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id BCF2D42B40 for ; Sat, 8 Aug 2015 17:27:24 +0000 (UTC) Received: from mailbox.informatik.hu-berlin.de (mailbox [141.20.20.63]) by mail.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-25) with ESMTP id t78HRF4O028775 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Sat, 8 Aug 2015 19:27:16 +0200 (MEST) Received: from [192.168.199.194] (wlan.informatik.hu-berlin.de [141.20.21.26]) (authenticated bits=0) by mailbox.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-AUTH-26-465-587) with ESMTP id t78HREaP028770 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES128-SHA bits=128 verify=NO) for ; Sat, 8 Aug 2015 19:27:15 +0200 (MEST) Message-ID: <55C63BFD.4000704@informatik.hu-berlin.de> Date: Sat, 08 Aug 2015 19:27:25 +0200 From: "Matthias J. Sax" User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Icedove/31.8.0 MIME-Version: 1.0 To: dev@flink.apache.org Subject: Re: Some problems about Flink applications References: <5597EA3A57BF184588054E89ECCAA3D349AA3CAD@szxeml559-mbx.china.huawei.com> In-Reply-To: <5597EA3A57BF184588054E89ECCAA3D349AA3CAD@szxeml559-mbx.china.huawei.com> Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="5jiBP5l6em4uv0GlgKlMoUnUSPtcuGgPD" X-Virus-Scanned: clamav-milter 0.98.4 at mailbox X-Virus-Status: Clean X-Greylist: Sender succeeded STARTTLS authentication, not delayed by milter-greylist-4.5.1 (mail.informatik.hu-berlin.de [141.20.20.50]); Sat, 08 Aug 2015 19:27:16 +0200 (MEST) --5jiBP5l6em4uv0GlgKlMoUnUSPtcuGgPD Content-Type: text/plain; charset=gbk Content-Transfer-Encoding: quoted-printable Hi Huang, about Storm compatibility. Did you double check, that the file that is missing (StormSpoutWrapper) is contained in your jar. Looking at pom.xml does not help here, because if you specify to include a file, but maven cannot find it, it will just not add it to the jar, but build will succeed. Thus, you need to check the jar file itself via command line: unzip -l myjarfile.jar or unzip -l myjarfile.jar | grep file-I-am-looking-for I guess your jar is not build correctly, ie, the file is not there... Did you have a look into pom.xml for flink-storm-compatibililty-example and the corresponding word-count-storm.xml? This shows how to build a jar correctly (it was recently fixed, so make sure you update to the latest master) You can also have a look here how to package jars correctly (even if this example is about Flink ML): https://stackoverflow.com/questions/31661900/maven-build-has-missing-pack= age-linkages/31662642#31662642 -Matthias On 08/08/2015 11:15 AM, huangwei (G) wrote: > Hi, > I get some trouble in developing Flink applications. >=20 > 1. > I want to test the performance between Storm and flink-storm-compatibil= ity using the test program: https://github.com/project-flink/flink-perf/b= lob/master/storm-jobs/src/jvm/experiments/Throughput.java. > And there is a bit of my changes with this Throughput.java below: >=20 >=20 >=20 > public static void main(String[] args) throws Exception { > ParameterTool pt =3D ParameterTool.fromArgs(args); >=20 > int par =3D pt.getInt("para"); >=20 > final FlinkTopologyBuilder builder =3D new FlinkTopo= logyBuilder(); >=20 > builder.setSpout("source0", new Generator(pt), pt.ge= tInt("sourceParallelism")); >=20 > int i =3D 0; > for (; i < pt.getInt("repartitions", 1) - 1; i++) { > System.out.println("adding source" + i + " = --> source" + (i + 1)); > builder.setBolt("source" + (i + 1), new Rep= artPassThroughBolt(pt), pt.getInt("sinkParallelism")) > .fieldsGrouping("source"= + i, new Fields("id")); > } >=20 > System.out.println("adding final source" + i + " -->= sink"); >=20 > builder.setBolt("sink", new Sink(pt), pt.getInt("sin= kParallelism")).fieldsGrouping("source" + i, new Fields("id")); >=20 > Config conf =3D new Config(); > conf.setDebug(false); > //System.exit(1); >=20 > // execute program locally > final FlinkLocalCluster cluster =3D FlinkLocalCluste= r.getLocalCluster(); > cluster.submitTopology("throughput", null, builder.c= reateTopology()); >=20 > Utils.sleep(10 * 1000); >=20 > // TODO kill does no do anything so far > cluster.killTopology("throughput"); > cluster.shutdown(); > } >=20 >=20 > This program will run well in IDEA with flink-storm-compatibility. > However, when I packaged it into a jar file and run on the flink-0.10SN= APSHOT there is a problem in flink-client log file: >=20 > java.lang.Exception: Call to registerInputOutput() of invokable failed > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521= ) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException= : Cannot instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig.getStream= Operator(StreamConfig.java:187) > at org.apache.flink.streaming.runtime.tasks.StreamTask.registe= rInputOutput(StreamTask.java:90) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518= ) > ... 1 more > Caused by: java.lang.ClassNotFoundException: org.apache.flink.stormcomp= atibility.wrappers.StormSpoutWrapper > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectIn= putStream.resolveClass(InstantiationUtil.java:71) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStrea= m.java:1613) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.j= ava:1518) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStr= eam.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.jav= a:1351) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStre= am.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.= java:1918) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStr= eam.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.jav= a:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java= :371) > at org.apache.flink.util.InstantiationUtil.deserializeObject(I= nstantiationUtil.java:302) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfi= g(InstantiationUtil.java:264) > at org.apache.flink.streaming.api.graph.StreamConfig.getStream= Operator(StreamConfig.java:185) > ... 3 more >=20 >=20 > And this class(StormSpoutWrapper) was exist in my packaged jar file. > As you can see part of my pom.xml: >=20 > > > Throughput > package > > jar > > > Throughput >=20 > > > org.apache.flink.sto= rmcompatibility.experiments.Throughput > > >=20 > > > defaults.yaml > backtype/storm/*.class > backtype/storm/serialization/*= =2Eclass > backtype/storm/topology/*.clas= s > backtype/storm/topology/base/*= =2Eclass > backtype/storm/utils/*.class > backtype/storm/spout/*.class > backtype/storm/task/*.class > backtype/storm/tuple/*.class > backtype/storm/generated/*.cla= ss > backtype/storm/metric/**/*.cla= ss > org/apache/storm/curator/*.cla= ss > org/apache/thrift7/**/*.class<= /include> > > org/yaml/snakeyaml/**/*.class<= /include> > = > org/json/simple/**/*.class > > org/apache/flink/stormcompatib= ility/api/*.class > org/apache/flink/stormcompatib= ility/wrappers/*.class > > org/apache/flink/stormcompatib= ility/experiments/Throughput.class > org/apache/flink/stormcompatib= ility/experiments/Throughput$*.class > > > >=20 > So how can I fix it? >=20 >=20 > 2. > There is a case following using operator join: >=20 > DataStream> user =3D env.addSource(new so= urceUserFunction()); > DataStream> area =3D env.addSource(new sourceAr= eaFunction()); >=20 > DataStream> sink =3D user > .join(area) > .onWindow(15, TimeUnit.MINUTES) > .where(0) > .equalTo(0) > .with(new JoinFunction, Tuple2, Tuple2>() { > @Override > public Tuple2 join(Tuple3 first, Tuple2 second) throws Exception { > if (first.f1 + second.f1 > 10){ > return new Tuple2(first.f0, first.f1 + = second.f1); > } > return null; > } > }); >=20 > As you see, I don`t want to return null when the condition is not satis= fied. > But there is not any JoinFunction with Collector. > I found a FlatJoinFunction which allows the Collector. > However, the FlatJoinFunction seem only can be used in DataSet instead = DataStream. > Is there any other way to improve this case? >=20 > PS. I`m sorry about this email. You may ignore me during the weekend. >=20 > Greetings, > Huang Wei > =BB=AA=CE=AA=BC=BC=CA=F5=D3=D0=CF=DE=B9=AB=CB=BE Huawei Technologies Co= =2E, Ltd. >=20 >=20 > Tel:+86 18106512602 > Email:huangwei111@huawei.com >=20 --5jiBP5l6em4uv0GlgKlMoUnUSPtcuGgPD Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQIcBAEBCAAGBQJVxjwAAAoJEBXkotPFErDWPKQP/0X8GsIrNy3H8xojUC4Wekak r/dyEUF8M0ZRj6OOQj3kOPx+MbvpRvArwwjFGxl6tIb5Zz0DWOK6iy9kNecdadcu NPUjO0FND6DCsxEg3rfbsRsdRai4WExzm5RyMPuH71uvdU3pAHwQfhW0oBIgf6AI phgXwVklWprqukuQ/WCBC5JHn2EwF5o/EvRpSqQZ7wWqjDnaGK1Uwi/0ZeX2V7wG 8VoIKQyUQVB+lqdVHr2DinH7rU+5L59lOssVjkkg2qMUW5pgxCz7EaAVA4S3f/EJ enASB8kLNn+YGft1Tqt1Crkl9qKOJ8fEyYEvVtddMVUop167Zmpu0r3beHiZ18z9 7dMt6zI9jaiOAlXlXlNH7t7C7wmYxPMwTAu5D5tE+roel43hjZGHLKGjuerXDGVI oCNARqae/T5GiX45mjpYAJ14iX9jr9LvuXObWhgb/8FJJFb5PiXjyySjViPi3kQ2 AKWXlB46Ub2cFwCuR8QstA6Dxw3k+OJrY6askhMXfZXt7L9czZDEUDV8BRchaPQn ZZYMRbwIlbkg4bU+6WQACaqmD5jjyljIqhgrBahBpLQvyOVSLdLYp8sFMLprKajj zBfMBeT1SsS67LX8UbDVD3jLvhs8Q3ZXRXgcJpTE5tNq8frlqN6t8k7DEa2lMSek xHGrTMALsayTumFSu9z2 =YDvw -----END PGP SIGNATURE----- --5jiBP5l6em4uv0GlgKlMoUnUSPtcuGgPD--