flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niklas Semmler <nik...@inet.tu-berlin.de>
Subject Re: A custom FileInputFormat
Date Wed, 16 Nov 2016 19:13:29 GMT
Hello Fabian,

thanks for the response and my apologies for the late reply.

I tried extending the InputFormat, but it felt to hackish, so I simply
loaded the files before running the job and fed it to Flink via the
env.fromCollections(...) statement.

Cheers,
Niklas

On 01.11.2016 21:59, Fabian Hueske wrote:
> Hi Niklas,
>
> I don't know exactly what is going wrong there, but I have a few pointers
> for you:
>
> 1) in cluster setups, Flink redirects println() to ./log/*.out files, i.e,
> you have to search for the task manager that ran the DirReader and check
> its ./log/*.out file
> 2) you are using Java's File class. That will only work if you are
> accessing the local file system of the machine the DirReader runs on. If
> you want to read the files from an HDFS you have to use the corresponding
> HDFS client.
> 3) I would not extend the FileInputFormat for your purpose. The
> FileInputFormat is meant to *read* files, not just look up file names. I'd
> rather implement an InputFormat from scratch. Since you are only running a
> single instance, you can return a single dummy InputSplit.
>
> Let me know, if you have further questions.
> Best, Fabian
>
> 2016-10-28 18:38 GMT+02:00 Niklas Semmler <niklas@inet.tu-berlin.de>:
>
>> Hello Flink community,
>>
>> I am running into an issue with a custom FileInputFormat class and would
>> appreciate your help.
>>
>> My goal is to read all files from a directory as paths:
>>
>> val env : ExecutionEnvironment = ExecutionEnvironment.getExecut
>> ionEnvironment
>>
>> var source : DataSet[String] = env.readFile(new DirReader,
>> "/tmp/mydir").setParallelism(1)
>>
>> source.writeAsText("/tmp/results", WriteMode.OVERWRITE)
>>
>> env.execute("Job")
>>
>> It works, when I execute the program from within my IDE or execute it
>> directly as a fat jar. When I run it through the Flink CLI the file
>> "/tmp/results" is created, but not filled with entries.
>>
>> There seems to be something wrong with my custom DirReader (see below).
>> The output of the println statements is not visible when running the code
>> from the Flink CLI.
>>
>> No exception is stated in the logs (see below). I am at a loss at what to
>> try. Even worse, when I copy the fat jar to a remote system, the problem
>> appears also when I execute the fat jar directly.
>>
>> Local System
>> Flink: 1.0.2
>> Java: 1.8.0_102
>> Scala: 2.11.8
>>
>> Remote System
>> Flink: 1.1.3
>> Java: 1.8.0_92
>> Scala: 2.11.6
>>
>> Help or ideas to try out are welcome!
>>
>> Best,
>> Niklas
>>
>>
>>
>> ----------------------------------------
>> import java.io.File
>>
>> import org.apache.flink.api.common.io.FileInputFormat
>>
>> class DirReader extends FileInputFormat[String] {
>>   var running : Boolean = false
>>   var fileList : Array[String] = null
>>
>>   override def openInputFormat() = {
>>       println("Path: " + this.filePath.toString)
>>       val directory = new File(this.filePath.toString)
>>       if (directory != null && directory.isDirectory) {
>>         fileList = directory.listFiles.filter(_.i
>> sDirectory).map(_.listFiles).flatten
>>          .map(_.toString)
>>         running = if (fileList.length > 1) true else false
>>       }
>>       println("fileList " + fileList.length + " running " + running)
>>   }
>>
>>   override def nextRecord(reuse: String): String = {
>>     val head = fileList.head
>>     println("File: " + head)
>>     fileList = fileList.tail
>>     running = if (fileList.length == 0) false else true
>>     head
>>   }
>>
>>   override def reachedEnd(): Boolean = ! running
>> }
>> ----------------------------------------
>>
>> The output from the CLI:
>>
>> 10/28/2016 18:27:56     Job execution switched to status RUNNING.
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to SCHEDULED
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to DEPLOYING
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to RUNNING
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to FINISHED
>> 10/28/2016 18:27:56     Job execution switched to status FINISHED.
>>
>>
>> --
>> Niklas Semmler
>> PhD Student / Research Assistant
>> TU Berlin, INET, Room MAR 4.027
>> Marchstr 23, 10587 Berlin
>> Tel.: +49 (0)30 314 75739
>> http://inet.tu-berlin.de/~nsemmler/
>>
>

-- 
Niklas Semmler
PhD Student / Research Assistant
TU Berlin, INET, Room MAR 4.027
Marchstr 23, 10587 Berlin
Tel.: +49 (0)30 314 75739
http://inet.tu-berlin.de/~nsemmler/

Mime
View raw message