flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niklas Semmler <nik...@inet.tu-berlin.de>
Subject A custom FileInputFormat
Date Fri, 28 Oct 2016 16:38:34 GMT
Hello Flink community,

I am running into an issue with a custom FileInputFormat class and would 
appreciate your help.

My goal is to read all files from a directory as paths:

val env : ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment

var source : DataSet[String] = env.readFile(new DirReader, 
"/tmp/mydir").setParallelism(1)

source.writeAsText("/tmp/results", WriteMode.OVERWRITE)

env.execute("Job")

It works, when I execute the program from within my IDE or execute it 
directly as a fat jar. When I run it through the Flink CLI the file 
"/tmp/results" is created, but not filled with entries.

There seems to be something wrong with my custom DirReader (see below). 
The output of the println statements is not visible when running the 
code from the Flink CLI.

No exception is stated in the logs (see below). I am at a loss at what 
to try. Even worse, when I copy the fat jar to a remote system, the 
problem appears also when I execute the fat jar directly.

Local System
Flink: 1.0.2
Java: 1.8.0_102
Scala: 2.11.8

Remote System
Flink: 1.1.3
Java: 1.8.0_92
Scala: 2.11.6

Help or ideas to try out are welcome!

Best,
Niklas



----------------------------------------
import java.io.File

import org.apache.flink.api.common.io.FileInputFormat

class DirReader extends FileInputFormat[String] {
   var running : Boolean = false
   var fileList : Array[String] = null

   override def openInputFormat() = {
       println("Path: " + this.filePath.toString)
       val directory = new File(this.filePath.toString)
       if (directory != null && directory.isDirectory) {
         fileList = 
directory.listFiles.filter(_.isDirectory).map(_.listFiles).flatten
          .map(_.toString)
         running = if (fileList.length > 1) true else false
       }
       println("fileList " + fileList.length + " running " + running)
   }

   override def nextRecord(reuse: String): String = {
     val head = fileList.head
     println("File: " + head)
     fileList = fileList.tail
     running = if (fileList.length == 0) false else true
     head
   }

   override def reachedEnd(): Boolean = ! running
}
----------------------------------------

The output from the CLI:

10/28/2016 18:27:56	Job execution switched to status RUNNING.
10/28/2016 18:27:56	DataSource (at 
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) 
(de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED
10/28/2016 18:27:56	DataSource (at 
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) 
(de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING
10/28/2016 18:27:56	DataSource (at 
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) 
(de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING
10/28/2016 18:27:56	DataSink (TextOutputFormat (/tmp/results) - 
UTF-8)(1/1) switched to SCHEDULED
10/28/2016 18:27:56	DataSink (TextOutputFormat (/tmp/results) - 
UTF-8)(1/1) switched to DEPLOYING
10/28/2016 18:27:56	DataSource (at 
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) 
(de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED
10/28/2016 18:27:56	DataSink (TextOutputFormat (/tmp/results) - 
UTF-8)(1/1) switched to RUNNING
10/28/2016 18:27:56	DataSink (TextOutputFormat (/tmp/results) - 
UTF-8)(1/1) switched to FINISHED
10/28/2016 18:27:56	Job execution switched to status FINISHED.


-- 
Niklas Semmler
PhD Student / Research Assistant
TU Berlin, INET, Room MAR 4.027
Marchstr 23, 10587 Berlin
Tel.: +49 (0)30 314 75739
http://inet.tu-berlin.de/~nsemmler/

Mime
View raw message