flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Huelfenhaus <m.huelfenh...@davengo.com>
Subject java.lang.ClassNotFoundException when deploying streaming jar locally
Date Thu, 06 Aug 2015 09:08:20 GMT
Hello everybody

I am truing to build a very simple streaming application with the nightly build of flink 0.10,
my code runs fine in eclipse.

But when I build and deploy the jar locally I always get java.lang.ClassNotFoundException:
com.otter.ist.flink.DaoJoin$1

There is also no plan visible in the web interface.

I start the local flink 0.10 with start-local-streaming.sh  after building it from the git
code

Below you find the complete error, my code and the pom.xml any help is appreciated.

Cheers Michael


error log from web interface:
An error occurred while invoking the program:

The main method caused an error.


org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:364)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LeaderSessionMessages$$anonfun$receive$1.applyOrElse(LeaderSessionMessages.scala:40)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:101)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate
user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
	at org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:173)
	at org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:159)
	at org.apache.flink.streaming.runtime.tasks.OutputHandler.<init>(OutputHandler.java:107)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:99)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:523)
	... 1 more
Caused by: java.lang.ClassNotFoundException: com.otter.ist.flink.DaoJoin$1
	at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:344)
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:205)
	... 6 more

my code:

package com.otter.ist.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DaoJoin {

	public static void main(String[] args) throws Exception {
		// *************************************************************************
		// PROGRAM
		// *************************************************************************

		if (!parseParameters(args)) {
			return;
		}

		// set up the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		

		// get input data
		DataStream<String> text = getTextDataStream(env);

		DataStream<Tuple3<String, String, Integer>> epcs = text.map(new MapFunction<String,
Tuple3<String, String, Integer>>(){

			private static final long serialVersionUID = -7889264579632622427L;

			@Override
			public Tuple3<String, String, Integer> map(String line) throws Exception {
				String[] fields = line.split(" ");

				return new Tuple3<String, String, Integer>(fields[0], fields[1], Integer.parseInt(fields[2]));
			}
			
		});

		// emit result
		if (fileOutput) {
			epcs.writeAsText(outputPath);
		} else {
			epcs.print();
		}
		System.out.println(env.getExecutionPlan());
		
		// execute program
		env.execute("DaoJoin");
	}

	// *************************************************************************
	// UTIL METHODS
	// *************************************************************************

	private static boolean fileOutput = false;
	private static String textPath;
	private static String outputPath;

	private static boolean parseParameters(String[] args) {

		if (args.length > 0) {
			// parse input arguments
			fileOutput = true;
			if (args.length == 2) {
				textPath = args[0];
				outputPath = args[1];
			} else {
				System.err.println("Usage: DaoJoin <text path> <result path>");
				return false;
			}
			System.out.println("fileout: " + fileOutput);
		} else {
			System.out.println("Executing WordCount example with built-in default data.");
			System.out.println("  Provide parameters to read input data from a file.");
			System.out.println("  Usage: WordCount <text path> <result path>");
		}
		return true;
	}

	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env)
{
			// read the text file from given input path
			return env.readTextFile(textPath);
	}
}


the pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://xwww.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	
	<groupId>com.otter.ist.flink</groupId>
	<artifactId>flink-test</artifactId>
	<version>0.1</version>
	<packaging>jar</packaging>

	<name>DaoJoin</name>
	<url>http://www.otter.com</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<repositories>
		<repository>
			<id>apache.snapshots</id>
			<name>Apache Development Snapshot Repository</name>
			<url>https://repository.apache.org/content/repositories/snapshots/</url>
			<releases>
				<enabled>false</enabled>
			</releases>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories>
	
	<!-- 
		
		Execute "mvn clean package -Pbuild-jar"
		to build a jar file out of this project!

		How to use the Flink Quickstart pom:

		a) Adding new dependencies:
			You can add dependencies to the list below.
			Please check if the maven-shade-plugin below is filtering out your dependency
			and remove the exclude from there.

		b) Build a jar for running on the cluster:
			There are two options for creating a jar from this project

			b.1) "mvn clean package" -> this will create a fat jar which contains all
					dependencies necessary for running the jar created by this pom in a cluster.
					The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.

			b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
					nicer dependency exclusion handling. This approach is preferred and leads to
					much cleaner jar files.
	-->

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>0.10-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-core</artifactId>
			<version>0.10-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>0.10-SNAPSHOT</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
			except flink and it's transitive dependencies. The resulting fat-jar can be executed
			on a cluster. Change the value of Program-Class if your program entry point changes. -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>2.3</version>
				<executions>
					<!-- Run shade goal on package phase -->
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<!-- This list contains all dependencies of flink-dist
									Everything else will be packaged into the fat-jar
									-->
									<exclude>org.apache.flink:flink-shaded-*</exclude>
									<exclude>org.apache.flink:flink-core</exclude>
									<exclude>org.apache.flink:flink-java</exclude>
									<exclude>org.apache.flink:flink-scala</exclude>
									<exclude>org.apache.flink:flink-runtime</exclude>
									<exclude>org.apache.flink:flink-optimizer</exclude>
									<exclude>org.apache.flink:flink-clients</exclude>
									<exclude>org.apache.flink:flink-spargel</exclude>
									<exclude>org.apache.flink:flink-avro</exclude>
									<exclude>org.apache.flink:flink-java-examples</exclude>
									<exclude>org.apache.flink:flink-scala-examples</exclude>
									<exclude>org.apache.flink:flink-streaming-examples</exclude>
									<exclude>org.apache.flink:flink-streaming-core</exclude>

									<!-- Also exclude very big transitive dependencies of Flink

									WARNING: You have to remove these excludes if your code relies on other
									versions of these dependencies.

									-->
									<exclude>org.scala-lang:scala-library</exclude>
									<exclude>org.scala-lang:scala-compiler</exclude>
									<exclude>org.scala-lang:scala-reflect</exclude>
									<exclude>com.amazonaws:aws-java-sdk</exclude>
									<exclude>com.typesafe.akka:akka-actor_*</exclude>
									<exclude>com.typesafe.akka:akka-remote_*</exclude>
									<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
									<exclude>io.netty:netty-all</exclude>
									<exclude>io.netty:netty</exclude>
									<exclude>org.eclipse.jetty:jetty-server</exclude>
									<exclude>org.eclipse.jetty:jetty-continuation</exclude>
									<exclude>org.eclipse.jetty:jetty-http</exclude>
									<exclude>org.eclipse.jetty:jetty-io</exclude>
									<exclude>org.eclipse.jetty:jetty-util</exclude>
									<exclude>org.eclipse.jetty:jetty-security</exclude>
									<exclude>org.eclipse.jetty:jetty-servlet</exclude>
									<exclude>commons-fileupload:commons-fileupload</exclude>
									<exclude>org.apache.avro:avro</exclude>
									<exclude>commons-collections:commons-collections</exclude>
									<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
									<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
									<exclude>com.thoughtworks.paranamer:paranamer</exclude>
									<exclude>org.xerial.snappy:snappy-java</exclude>
									<exclude>org.apache.commons:commons-compress</exclude>
									<exclude>org.tukaani:xz</exclude>
									<exclude>com.esotericsoftware.kryo:kryo</exclude>
									<exclude>com.esotericsoftware.minlog:minlog</exclude>
									<exclude>org.objenesis:objenesis</exclude>
									<exclude>com.twitter:chill_*</exclude>
									<exclude>com.twitter:chill-java</exclude>
									<exclude>com.twitter:chill-avro_*</exclude>
									<exclude>com.twitter:chill-bijection_*</exclude>
									<exclude>com.twitter:bijection-core_*</exclude>
									<exclude>com.twitter:bijection-avro_*</exclude>
									<exclude>commons-lang:commons-lang</exclude>
									<exclude>junit:junit</exclude>
									<exclude>de.javakaffee:kryo-serializers</exclude>
									<exclude>joda-time:joda-time</exclude>
									<exclude>org.apache.commons:commons-lang3</exclude>
									<exclude>org.slf4j:slf4j-api</exclude>
									<exclude>org.slf4j:slf4j-log4j12</exclude>
									<exclude>log4j:log4j</exclude>
									<exclude>org.apache.commons:commons-math</exclude>
									<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
									<exclude>commons-logging:commons-logging</exclude>
									<exclude>org.apache.httpcomponents:httpclient</exclude>
									<exclude>org.apache.httpcomponents:httpcore</exclude>
									<exclude>commons-codec:commons-codec</exclude>
									<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
									<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
									<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
									<exclude>org.codehaus.jettison:jettison</exclude>
									<exclude>stax:stax-api</exclude>
									<exclude>com.typesafe:config</exclude>
									<exclude>org.uncommons.maths:uncommons-maths</exclude>
									<exclude>com.github.scopt:scopt_*</exclude>
									<exclude>org.mortbay.jetty:servlet-api</exclude>
									<exclude>commons-io:commons-io</exclude>
									<exclude>commons-cli:commons-cli</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<artifact>org.apache.flink:*</artifact>
									<excludes>
										<exclude>org/apache/flink/shaded/**</exclude>
										<exclude>web-docs/**</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<!-- add Main-Class to manifest file -->
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.otter.ist.flink.DaoJoin</mainClass>
								</transformer>
							</transformers>
							<createDependencyReducedPom>false</createDependencyReducedPom>
						</configuration>
					</execution>
				</executions>
			</plugin>
			
			<!-- Configure the jar plugin to add the main class as a manifest entry -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>2.5</version>
				<configuration>
					<archive>
						<manifestEntries>
							<Main-Class>com.otter.ist.flink.DaoJoin</Main-Class>
						</manifestEntries>
					</archive>
				</configuration>
			</plugin>

			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>1.8</source> <!-- If you want to use Java 8, change this to "1.8"
-->
					<target>1.8</target> <!-- If you want to use Java 8, change this to "1.8"
-->
				</configuration>
			</plugin>
		</plugins>
		
			
		<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
		<!--
		<pluginManagement>
			<plugins>
				<plugin>
					<artifactId>maven-compiler-plugin</artifactId>
					<configuration>
						<source>1.8</source>
						<target>1.8</target>
						<compilerId>jdt</compilerId>
					</configuration>
					<dependencies>
						<dependency>
							<groupId>org.eclipse.tycho</groupId>
							<artifactId>tycho-compiler-jdt</artifactId>
							<version>0.21.0</version>
						</dependency>
					</dependencies>
				</plugin>
		
				<plugin>
					<groupId>org.eclipse.m2e</groupId>
					<artifactId>lifecycle-mapping</artifactId>
					<version>1.0.0</version>
					<configuration>
						<lifecycleMappingMetadata>
							<pluginExecutions>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-assembly-plugin</artifactId>
										<versionRange>[2.4,)</versionRange>
										<goals>
											<goal>single</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-compiler-plugin</artifactId>
										<versionRange>[3.1,)</versionRange>
										<goals>
											<goal>testCompile</goal>
											<goal>compile</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
							</pluginExecutions>
						</lifecycleMappingMetadata>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>
		-->
		
	</build>
	<profiles>
		<profile>
			<!-- A profile that does everyting correctly:
			We set the Flink dependencies to provided -->
			<id>build-jar</id>
			<activation>
				<activeByDefault>false</activeByDefault>
			</activation>
			<dependencies>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-java</artifactId>
					<version>0.10-SNAPSHOT</version>
					<scope>provided</scope>
				</dependency>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-streaming-core</artifactId>
					<version>0.10-SNAPSHOT</version>
					<scope>provided</scope>
				</dependency>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-clients</artifactId>
					<version>0.10-SNAPSHOT</version>
					<scope>provided</scope>
				</dependency>
			</dependencies>
		</profile>
	</profiles>
</project>
Mime
View raw message