flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukas Kircher <lukas.kirc...@uni-konstanz.de>
Subject Re: Recursive directory traversal with TextInputFormat
Date Thu, 08 Dec 2016 07:43:07 GMT
Hi Stefan,

thanks for your answer.

> I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that
could serve your purpose if you override the default:

That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java
and call setNestedFileEnumeration(true), but once the stream is processed only the content
of the .csv file in the top-most folder is printed. The example is just a few lines of self-contained
code, see below. Does anybody have an idea?

Cheers,
Lukas


import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }


> On 7 Dec 2016, at 17:44, Stefan Richter <s.richter@data-artisans.com> wrote:
> 
> Hi,
> 
> I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that
could serve your purpose if you override the default:
> 
> /**
>  * The flag to specify whether recursive traversal of the input directory
>  * structure is enabled.
>  */
> protected boolean enumerateNestedFiles = false;
> As for compression, I think this class also provides a InflaterInputStreamFactory to
read compressed data.
> 
> Best,
> Stefan
> 
>> Am 07.12.2016 um 12:10 schrieb Lukas Kircher <lukas.kircher@uni-konstanz.de <mailto:lukas.kircher@uni-konstanz.de>>:
>> 
>> Hi all,
>> 
>> I am trying to read nested .csv files from a directory and want to switch from a
custom SourceFunction I implemented to the TextInputFormat. I have two questions:
>> 
>> 1) Somehow only the file in the root directory is processed, nested files are skipped.
What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3
no matter if I run it via the IDE or submit the job to the standalone binary. The file input
splits are all there, yet they don't seem to be processed.
>> 
>> 2) What is the easiest way to read compressed .csv files (.zip)?
>> 
>> Thanks for your help, cheers
>> Lukas
>> 
>> <ReadDirectorySSCCE.java>
> 


Mime
View raw message