flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Unable to submit flink job that uses Avro data
Date Wed, 23 Mar 2016 06:59:09 GMT
Could you be missing the call to execute()?

On 23.03.2016 01:25, Tarandeep Singh wrote:
> Hi,
>
> I wrote a simple Flink job that uses Avro input format to read avro 
> file and save the results in avro format. The job does not get 
> submitted and job client exist out immediately. Same thing happens if 
> I run the program in IDE or if I submit via command line.
>
> Here is the program-
>
> import com.styleseat.flinkpractice.avro.PageTrackingRecord; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.AvroInputFormat; import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.core.fs.Path; public class GrepAvro {
>
>      public static void main(String[] args) {
>          final String keyword = args[0]; final Path inputPath =new Path(args[1]); final
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final AvroInputFormat<PageTrackingRecord>
inputFormat =new AvroInputFormat<PageTrackingRecord>(inputPath, PageTrackingRecord.class);
DataSet<PageTrackingRecord> dataSet = env.createInput(inputFormat); dataSet.filter(new
FilterFunction<PageTrackingRecord>() {
>              @Override public boolean filter(PageTrackingRecord pageTrackingRecord)throws
Exception {
>                  String userAgent = pageTrackingRecord.getUserAgent().toString(); return
(userAgent !=null && userAgent.contains(keyword)); }
>          }).write(new AvroOutputFormat<PageTrackingRecord>(PageTrackingRecord.class),
args[2]); }
> }
>
> The avro files are stored in HDFS and I used the hdfs paths 
> (hdfs:///user/flink/data/...)
> There isn't any error in log file, however when I  ran the job via web 
> interface, I get this error-
>
> org.apache.flink.client.program.ProgramInvocationException: The program plan could not
be fetched - the program aborted pre-maturely.
>
> System.err: (none)
>
> System.out: (none)
> 	at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:102)
> 	at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
> 	at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
> 	at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
> 	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
> 	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
> 	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
> 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> 	at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> 	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
> 	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
> 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> 	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
> 	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
> 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> 	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> 	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> 	at java.lang.Thread.run(Thread.java:745)
>
> This is my pom.xml file -
>
> <?xml version="1.0" encoding="UTF-8"?> <project 
> xmlns="http://maven.apache.org/POM/4.0.0" 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.xyz <http://com.xyz></groupId> <artifactId>flink-practice</artifactId>
<version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.0.0</flink.version> </properties> <repositories>
<repository> <id>apache.snapshots</id> <name>Apache Development Snapshot
Repository</name> 
> <url>https://repository.apache.org/content/repositories/snapshots/</url>

> <releases> <enabled>false</enabled> </releases> <snapshots>
<enabled>true</enabled> </snapshots> </repository> </repositories>
<build> 
> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId>
<version>1.8.0</version> <executions> <execution> <phase>generate-sources</phase>
<goals> <goal>schema</goal> </goals> <configuration> <fieldVisibility>PRIVATE</fieldVisibility>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
</configuration> </execution> </executions> 
> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source>
<target>1.6</target> </configuration> </plugin> </plugins> </build>

> <dependencies> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <version>${flink.version}</version>
</dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-avro_2.11</artifactId> <version>${flink.version}</version>
</dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId>
<version>1.8.0</version> </dependency> </dependencies> </project>
>
> Any idea what I might be doing wrong? I was able to run flink jobs on 
> text data. So flink is working.
>
> Thanks,
> Tarandeep


Mime
View raw message