flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukas Kircher <lukas.kirc...@uni-konstanz.de>
Subject Fwd: Continuous File monitoring not reading nested files
Date Mon, 09 Jan 2017 12:47:25 GMT
Hi all,

this is probably related to the problem that I reported in December. In case it helps you
can find a self contained example below. I haven't looked deeply into the problem but it seems
like the correct file splits are determined but somehow not processed. If I read from HDFS
nested files are skipped as well which is a real problem for me at the moment.

Cheers,
Lukas

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ReadDirectorySSCCE {
    public static void main(String[] args) throws Exception {
        // create given dirs and add a .csv file to each one
        String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
        for (String dir: dirs) {
            // create input file
            File tmpDir = new File(dir);
            if (!tmpDir.exists()) {
                tmpDir.mkdirs();
            }
            File tempFile = File.createTempFile("file", ".csv", tmpDir);
            BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
            w.write("content of " + dir + "/file.csv");
            w.close();
            tempFile.deleteOnExit();
        }
        File root = new File("tmp");

        TextInputFormat inputFormat = new TextInputFormat(new Path(root.toURI().toString()));
        inputFormat.setNestedFileEnumeration(true);

        inputFormat.configure(new Configuration());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.createInput(inputFormat).print();
        env.execute();
    }
}
> 
>> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI <y.marzougui@mindlytix.com <mailto:y.marzougui@mindlytix.com>>
wrote:
>> 
>> Hi,
>> 
>> Any updates on this issue? Thank you.
>> 
>> Best,
>> Yassine
>> 
>> 
>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>> +kostas, who probably has the most experience with this by now. Do you have an idea
what might be going on?
>> 
>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzougui@mindlytix.com <mailto:y.marzougui@mindlytix.com>>
wrote:
>> Looks like this is not specific to the continuous file monitoring, I'm having the
same issue (files in nested directories are not read) when using:
>> 
>> env.readFile(fileInputFormat, "hdfs:///shared/mydir <hdfs:///shared/mydir>",
FileProcessingMode.PROCESS_ONCE, -1L)
>> 
>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com <mailto:y.marzougui@mindlytix.com>>:
>> Hi all,
>> 
>> I'm using the following code to continuously process files from a directory "mydir".
>> 
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs:///shared/mydir
<hdfs:///shared/mydir>"));
>> fileInputFormat.setNestedFileEnumeration(true);
>> 
>> env.readFile(fileInputFormat,
>>                 "hdfs:///shared/mydir <hdfs:///shared/mydir>",
>>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>>                 .print();
>> 
>> env.execute();
>> 
>> If I add directory under mydir, say "2016-12-16", and then add a file "2016-12-16/file.txt",
its contents are not printed. If I add the same file directly under "mydir",  its contents
are correctly printed. After that the logs will show the following :
>> 
>> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
 - Ignoring hdfs://mlxbackoffice/shared/ <hdfs://mlxbackoffice/shared/>mydir/2016-12-16,
with mod time= 1481882041587 and global mod time= 1481882126122
>> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
 - Ignoring hdfs://mlxbackoffice/shared/ <hdfs://mlxbackoffice/shared/>mydir/file.txt,
with mod time= 1481881788704 and global mod time= 1481882126122
>> 
>> Looks like the ContinuousFileMonitoringFunction  considered it already read 2016-12-16
as a file and then excludes it, but its contents were not processed. Any Idea why this happens?
>> Thank you.
>> 
>> Best,
>> Yassine
>> 
>> 
> 


Mime
View raw message