beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Problems trying to run Beam on Flink on Yarn
Date Thu, 20 Jul 2017 08:31:46 GMT
Hi Niels,

This is unfortunate. I just double checked the Flink distribution jars and they don’t contain
any Duration.class, so that shouldn’t clash. My only explanation is that YARN has some other
dependencies in the class path that provide a Duration.class that has a different signature.

Just for testing, could you try and place your word-count-beam-0.1.jar directly in the Flink
lib folder and don’t specify it with —filesToStage?

Best,
Aljoscha

> On 19. Jul 2017, at 14:28, Niels Basjes <Niels@basjes.nl> wrote:
> 
> I double checked the target/word-count-beam-0.1.jar and found that it includes org/joda/time/Duration.class
> which (according to "javap -s Duration.class" ) actually contains
> 
>   public static org.joda.time.Duration millis(long);
>     descriptor: (J)Lorg/joda/time/Duration;
> 
> I'm at a loss where it goes wrong.
> 
> Niels
> 
> 
> On Wed, Jul 19, 2017 at 12:34 PM, Niels Basjes <Niels@basjes.nl <mailto:Niels@basjes.nl>>
wrote:
> Hi,
> 
> We have a (Kerberos secured) Yarn cluster on which we run (among lots of others) our
Apache Flink applications.
> I'm having trouble getting even the simplest Beam application to run in this setup using
Apache Flink as the runner.
> 
> For these applications we effectively run a command similar to this:
> 
> flink run -m yarn-cluster                  \
>      --yarnstreaming                       \
>      --yarncontainer                 1     \
>      --yarnslots                     10     \
>      --yarnjobManagerMemory          3100  \
>      --yarntaskManagerMemory         3100  \
>      --yarnname "My application" \
>      my-application.jar
> 
> In the flink-conf.yaml there are a few settings needed to make everything work (port
numbers and such).
> 
> To get started I wanted to simply run the base 'word count' example on our cluster to
get the plumbing of running a job right.
> 
> I start with the Java Quickstart ( https://beam.apache.org/get-started/quickstart-java/
<https://beam.apache.org/get-started/quickstart-java/> )
> 
>  mvn archetype:generate \
>       -DarchetypeGroupId=org.apache.beam \
>       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
>       -DarchetypeVersion=2.0.0 \
>       -DgroupId=org.example \
>       -DartifactId=word-count-beam \
>       -Dversion="0.1" \
>       -Dpackage=org.apache.beam.examples \
>       -DinteractiveMode=false
> 
> To ensure it knows the main class I change this:
> 
> diff --git pom.xml pom.xml
> index bcd9258..201d48f 100644
> --- pom.xml
> +++ pom.xml
> @@ -110,6 +110,9 @@
>                </filters>
>                <transformers>
>                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> +                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> +                  <mainClass>org.apache.beam.examples.WordCount</mainClass>
> +                </transformer>
>                </transformers>
>              </configuration>
>            </execution>
> 
> Now I build this with the flink-runner built in:
> 
> mvn clean package -Pflink-runner
> 
> In the output I see 
>      [INFO] Including org.apache.flink:flink-streaming-java_2.10:jar:1.2.1 in the shaded
jar.
> 
> So I get the exact same Flink/Scala version for Hadoop 2.7.x (which we have)
>       http://archive.apache.org/dist/flink/flink-1.2.1/flink-1.2.1-bin-hadoop27-scala_2.10.tgz
<http://archive.apache.org/dist/flink/flink-1.2.1/flink-1.2.1-bin-hadoop27-scala_2.10.tgz>
> 
> I created a helloworld.txt which I put on HDFS
> 
> I then run this using
> 
> ./flink-1.2.1/bin/flink run \
>     -m yarn-cluster \
>     --yarncontainer 4 \
>     --yarnslots                     4     \
>     --yarnjobManagerMemory          2000  \
>     --yarntaskManagerMemory         2000  \
>     --yarnname "Word count on Beam on Flink on Yarn" \
>     target/word-count-beam-0.1.jar \
>     --runner=FlinkRunner \
>     --flinkMaster=[auto] \
>     --filesToStage=target/word-count-beam-0.1.jar \
>     --inputFile=helloworld.txt \
>     --output=hello-counts
> 
> The job gets submitted and fails with 
> 
> Starting execution of program
> 2017-07-19 10:29:45,664 INFO  org.apache.flink.yarn.YarnClusterClient               
       - Starting program in interactive mode
> 
> ------------------------------------------------------------
>  The program finished with the following exception:
> 
> java.lang.NoSuchMethodError: org.joda.time.Duration.millis(J)Lorg/joda/time/Duration;
> 	at org.apache.beam.sdk.util.GcsUtil.<clinit>(GcsUtil.java:146)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:264)
> 	at com.sun.proxy.$Proxy32.<clinit>(Unknown Source)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.apache.beam.sdk.util.InstanceBuilder.buildFromConstructor(InstanceBuilder.java:256)
> 	at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:164)
> 	at org.apache.beam.sdk.options.ProxyInvocationHandler.as <http://org.apache.beam.sdk.options.proxyinvocationhandler.as/>(ProxyInvocationHandler.java:217)
> 	at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:138)
> 	at com.sun.proxy.$Proxy45.as(Unknown Source)
> 	at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar.fromOptions(GcsFileSystemRegistrar.java:44)
> 	at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io/>.FileSystems.verifySchemesAreUnique(FileSystems.java:483)
> 	at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io/>.FileSystems.setDefaultPipelineOptions(FileSystems.java:473)
> 	at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
> 	at org.apache.beam.sdk.Pipeline.create(Pipeline.java:141)
> 	at org.apache.beam.examples.WordCount.main(WordCount.java:175)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
> 	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
> 	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> 	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079)
> 	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
> 	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> 	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1122)
> 2017-07-19 10:29:46,362 INFO  org.apache.flink.yarn.YarnClusterClient               
       - Sending shutdown request to the Application Master
> 
> 
> Yet during the build I saw 
>      [INFO] Including joda-time:joda-time:jar:2.4 in the shaded jar.
> and 
> 
> $ unzip -l target/word-count-beam-0.1.jar | fgrep org/joda/time/Duration.class
>      5290  2017-07-19 12:21   com/google/appengine/repackaged/org/joda/time/Duration.class
>      3748  2017-07-19 12:21   org/joda/time/Duration.class
> 
> What am I doing wrong?
> All help is appreciated!
> 
> Thanks.
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Mime
View raw message