flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Fwd: Continuous File monitoring not reading nested files
Date Mon, 09 Jan 2017 13:24:28 GMT
Hi Lukas,

have you tried to set the parameter " recursive.file.enumeration" to true?

|// create a configuration object Configuration parameters = new 
Configuration(); // set the recursive enumeration parameter 
parameters.setBoolean("recursive.file.enumeration", true); |

If this also does not work, I think this could be a bug. You can open an 
issue for it and attach your sample code.

Timo


Am 09/01/17 um 13:47 schrieb Lukas Kircher:
> Hi all,
>
> this is probably related to the problem that I reported in December. 
> In case it helps you can find a self contained example below. I 
> haven't looked deeply into the problem but it seems like the correct 
> file splits are determined but somehow not processed. If I read from 
> HDFS nested files are skipped as well which is a real problem for me 
> at the moment.
>
> Cheers,
> Lukas
>
> importorg.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; public class
ReadDirectorySSCCE {
>      public static void main(String[] args)throws Exception {
>          // create given dirs and add a .csv file to each one String[] dirs =new String[]
{"tmp", "tmp/first/", "tmp/second/"}; for (String dir: dirs) {
>              // create input file File tmpDir =new File(dir); if (!tmpDir.exists()) {
>                  tmpDir.mkdirs(); }
>              File tempFile = File.createTempFile("file", ".csv", tmpDir); BufferedWriter
w =new BufferedWriter(new FileWriter(tempFile)); w.write("content of " + dir +"/file.csv");
w.close(); tempFile.deleteOnExit(); }
>          File root =new File("tmp"); TextInputFormat  inputFormat =new TextInputFormat(new
Path(root.toURI().toString())); inputFormat.setNestedFileEnumeration(true); inputFormat.configure(new
Configuration()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.createInput(inputFormat).print(); env.execute(); }
> }
>>
>>> On 9 Jan 2017, at 12:56, Yassine MARZOUGUI 
>>> <y.marzougui@mindlytix.com <mailto:y.marzougui@mindlytix.com>> wrote:
>>>
>>> Hi,
>>>
>>> Any updates on this issue? Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljoscha@apache.org 
>>> <mailto:aljoscha@apache.org>> wrote:
>>>
>>>     +kostas, who probably has the most experience with this by now.
>>>     Do you have an idea what might be going on?
>>>
>>>     On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI
>>>     <y.marzougui@mindlytix.com <mailto:y.marzougui@mindlytix.com>>
>>>     wrote:
>>>
>>>         Looks like this is not specific to the continuous file
>>>         monitoring, I'm having the same issue (files in nested
>>>         directories are not read) when using:
>>>
>>>         env.readFile(fileInputFormat, "hdfs:///shared/mydir",
>>>         FileProcessingMode.PROCESS_ONCE, -1L)
>>>
>>>         2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI
>>>         <y.marzougui@mindlytix.com <mailto:y.marzougui@mindlytix.com>>:
>>>
>>>             Hi all,
>>>
>>>             I'm using the following code to continuously process
>>>             files from a directory "mydir".
>>>
>>>             final StreamExecutionEnvironment env =
>>>             StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>             FileInputFormat fileInputFormat = new
>>>             TextInputFormat(new Path("hdfs:///shared/mydir"));
>>>             fileInputFormat.setNestedFileEnumeration(true);
>>>
>>>             env.readFile(fileInputFormat,
>>>                       "hdfs:///shared/mydir",
>>>             FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>>>             .print();
>>>
>>>             env.execute();
>>>
>>>             If I add directory under mydir, say "2016-12-16", and
>>>             then add a file "/2016-12-16/file.txt"/, its contents
>>>             are not printed. If I add the same file directly under
>>>             "/mydir"/,  its contents are correctly printed. After
>>>             that the logs will show the following :
>>>
>>>             10:55:44,928 DEBUG
>>>             org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>>              - Ignoring
>>>             hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod
>>>             time= 1481882041587 and global mod time= 1481882126122
>>>             10:55:44,928 DEBUG
>>>             org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>>              - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt,
>>>             with mod time= 1481881788704 and global mod time=
>>>             1481882126122
>>>
>>>             Looks like the ContinuousFileMonitoringFunction
>>>              considered it already read 2016-12-16 as a file and
>>>             then excludes it, but its contents were not processed.
>>>             Any Idea why this happens?
>>>             Thank you.
>>>
>>>             Best,
>>>             Yassine
>>>
>>>
>>>
>>
>


Mime
View raw message