flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tarandeep Singh <tarand...@gmail.com>
Subject Re: Unable to submit flink job that uses Avro data
Date Wed, 23 Mar 2016 16:35:55 GMT
> On Wed, 23.03.2016 06:59, Chesnay Schepler wrote
> Could you be missing the call to execute()?

Yes, that was it. Can't believe I missed that !
Thank you Chesnay.

Best,
Tarandeep


On 23.03.2016 01:25, Tarandeep Singh wrote:
>> Hi,
>>
>> I wrote a simple Flink job that uses Avro input format to read avro
>> file and save the results in avro format. The job does not get
>> submitted and job client exist out immediately. Same thing happens if
>> I run the program in IDE or if I submit via command line.
>>
>> Here is the program-
>>
>> import com.styleseat.flinkpractice.avro.PageTrackingRecord; import
org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet; import
org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.AvroInputFormat; import
org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.core.fs.Path; public class GrepAvro {
>>
>>      public static void main(String[] args) {
>>          final String keyword = args[0]; final Path inputPath =new
Path(args[1]); final
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final AvroInputFormat<PageTrackingRecord>
inputFormat =new AvroInputFormat<PageTrackingRecord>(inputPath,
PageTrackingRecord.class);
DataSet<PageTrackingRecord> dataSet = env.createInput(inputFormat);
dataSet.filter(new
FilterFunction<PageTrackingRecord>() {
>>              @Override public boolean filter(PageTrackingRecord
pageTrackingRecord)throws
Exception {
>>                  String userAgent =
pageTrackingRecord.getUserAgent().toString(); return
(userAgent !=null && userAgent.contains(keyword)); }
>>          }).write(new
AvroOutputFormat<PageTrackingRecord>(PageTrackingRecord.class),
args[2]); }
>> }
>>
>> The avro files are stored in HDFS and I used the hdfs paths
>> (hdfs:///user/flink/data/...)
>> There isn't any error in log file, however when I  ran the job via web
>> interface, I get this error-
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
plan could not
be fetched - the program aborted pre-maturely.
>>
>> System.err: (none)
>>
>> System.out: (none)
>>     at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:102)
>>     at
org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
>>     at
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
>>     at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
>>     at
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
>>     at
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
>>     at
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
>>     at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>     at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>     at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>     at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
>>     at
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
>>     at
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
>>     at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>     at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>     at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>     at
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
>>     at
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
>>     at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>     at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>     at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>     at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>     at
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
>>     at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>     at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>     at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>     at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>     at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>     at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>     at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>     at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>     at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> This is my pom.xml file -
>>
>> <?xml version="1.0" encoding="UTF-8"?> <project
>> xmlns="http://maven.apache.org/POM/4.0.0"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xyz <http://com.xyz></groupId>
<artifactId>flink-practice</artifactId>
<version>1.0-SNAPSHOT</version> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.0.0</flink.version> </properties> <repositories>
<repository> <id>apache.snapshots</id> <name>Apache Development Snapshot
Repository</name>
>> <url>https://repository.apache.org/content/repositories/snapshots/</url>

>> <releases> <enabled>false</enabled> </releases> <snapshots>
<enabled>true</enabled> </snapshots> </repository> </repositories>
<build>
>> <plugins> <plugin> <groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.0</version> <executions> <execution>
<phase>generate-sources</phase>
<goals> <goal>schema</goal> </goals> <configuration>
<fieldVisibility>PRIVATE</fieldVisibility>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
</configuration> </execution> </executions>
>> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <configuration>
<source>1.6</source>
<target>1.6</target> </configuration> </plugin> </plugins> </build>

>> <dependencies> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <version>${flink.version}</version>
</dependency> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro_2.11</artifactId> <version>${flink.version}</version>
</dependency> <dependency> <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.0</version> </dependency> </dependencies> </project>
>>
>> Any idea what I might be doing wrong? I was able to run flink jobs on
>> text data. So flink is working.
>>
>> Thanks,
>> Tarandeep

Mime
View raw message