Hi Stefan,

thanks for your answer.

I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:

That was my first guess as well. I use the basic setup from org.apache.flink.api.java.io.TextInputFormatTest.java and call setNestedFileEnumeration(true), but once the stream is processed only the content of the .csv file in the top-most folder is printed. The example is just a few lines of self-contained code, see below. Does anybody have an idea?

Cheers,
Lukas


import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }


On 7 Dec 2016, at 17:44, Stefan Richter <s.richter@data-artisans.com> wrote:

Hi,

I think there is a field in FileInputFormat (which TextInputFormat is subclassing) that could serve your purpose if you override the default:

/**
* The flag to specify whether recursive traversal of the input directory
* structure is enabled.
*/
protected boolean enumerateNestedFiles = false;
As for compression, I think this class also provides a InflaterInputStreamFactory to read compressed data.

Best,
Stefan

Am 07.12.2016 um 12:10 schrieb Lukas Kircher <lukas.kircher@uni-konstanz.de>:

Hi all,

I am trying to read nested .csv files from a directory and want to switch from a custom SourceFunction I implemented to the TextInputFormat. I have two questions:

1) Somehow only the file in the root directory is processed, nested files are skipped. What am I missing? See the attachment for an SSCCE. I get the same result with flink 1.1.3 no matter if I run it via the IDE or submit the job to the standalone binary. The file input splits are all there, yet they don't seem to be processed.

2) What is the easiest way to read compressed .csv files (.zip)?

Thanks for your help, cheers
Lukas

<ReadDirectorySSCCE.java>