flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tarandeep Singh <tarand...@gmail.com>
Subject Unable to submit flink job that uses Avro data
Date Wed, 23 Mar 2016 00:25:16 GMT
Hi,

I wrote a simple Flink job that uses Avro input format to read avro file
and save the results in avro format. The job does not get submitted and job
client exist out immediately. Same thing happens if I run the program in
IDE or if I submit via command line.

Here is the program-

import com.styleseat.flinkpractice.avro.PageTrackingRecord;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.core.fs.Path;


public class GrepAvro {

    public static void main(String[] args) {
        final String keyword = args[0];
        final Path inputPath = new Path(args[1]);
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
        final AvroInputFormat<PageTrackingRecord> inputFormat = new
AvroInputFormat<PageTrackingRecord>(inputPath,
PageTrackingRecord.class);
        DataSet<PageTrackingRecord> dataSet = env.createInput(inputFormat);

        dataSet.filter(new FilterFunction<PageTrackingRecord>() {
            @Override
            public boolean filter(PageTrackingRecord
pageTrackingRecord) throws Exception {
                String userAgent = pageTrackingRecord.getUserAgent().toString();
                return (userAgent != null && userAgent.contains(keyword));
            }
        }).write(new
AvroOutputFormat<PageTrackingRecord>(PageTrackingRecord.class),
args[2]);
    }
}


The avro files are stored in HDFS and I used the hdfs paths
(hdfs:///user/flink/data/...)
There isn't any error in log file, however when I  ran the job via web
interface, I get this error-

org.apache.flink.client.program.ProgramInvocationException: The
program plan could not be fetched - the program aborted pre-maturely.

System.err: (none)

System.out: (none)
	at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:102)
	at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
	at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
	at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
	at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
	at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
	at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:745)


This is my pom.xml file -

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xyz</groupId>
    <artifactId>flink-practice</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.0.0</flink.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.0</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <fieldVisibility>PRIVATE</fieldVisibility>

<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>

<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.0</version>
        </dependency>
    </dependencies>


</project>


Any idea what I might be doing wrong? I was able to run flink jobs on text
data. So flink is working.

Thanks,
Tarandeep

Mime
View raw message