flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: streaming hdfs sub folders
Date Fri, 26 Feb 2016 13:52:19 GMT
Hi!

Have a look at the class-level comments in "InputFormat". They should
describe how input formats first generate splits (for parallelization) on
the master, and the workers open each split.

So you need something like this:

AvroInputFormat<EndSongCleanedPq> avroInputFormat = new
AvroInputFormat<EndSongCleanedPq>(new
Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s),
EndSongCleanedPq.class);
        avroInputFormat.setReuseAvroValue(false);

        for (FileInputSplit split : avroInputFormat.createInputSplits()) {
            avroInputFormat.open(split);

            while (! avroInputFormat.reachedEnd()){
                    EndSongCleanedPq res =
avroInputFormat.nextRecord(new EndSongCleanedPq());
                    if (res != null) collector.collect(res);
            }        }    }


Hope that helps.

Stephan



On Tue, Feb 23, 2016 at 12:04 PM, Martin Neumann <mneumann@sics.se> wrote:

> I'm not very familiar with the inner workings of the InputFomat's. calling
> .open() got rid of the Nullpointer but the stream still produces no output.
>
> As a temporary solution I wrote a batch job that just unions all the
> different datasets and puts them (sorted) into a single folder.
>
> cheers Martin
>
> On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> Hi Martin,
>>
>> where is the null pointer exception thrown?
>> I think you didn't call the open() method of the AvroInputFormat. Maybe
>> that's the issue.
>>
>> On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <mneumann@sics.se> wrote:
>>
>>> I tried to implement your idea but I'm getting NullPointer exceptions
>>> from the AvroInputFormat any Idea what I'm doing wrong?
>>> See the code below:
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>>     // set up the execution environment
>>>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>     env.setParallelism(1);
>>>
>>>     env.fromElements("00", "01", "02","03","22","23")
>>>             .flatMap(new FileExtractor())
>>>             .filter(new LocationFiter())
>>>             .flatMap(new PreProcessEndSongClean())
>>>             .writeAsCsv(outPath);
>>>
>>>
>>>     env.execute("something");
>>> }
>>>
>>> private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{
>>>
>>>     @Override
>>>     public void flatMap(String s, Collector<EndSongCleanedPq> collector)
throws Exception {
>>>         AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new
Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
>>>         avroInputFormat.setReuseAvroValue(false);
>>>         while (! avroInputFormat.reachedEnd()){
>>>                 EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
>>>                 if (res != null) collector.collect(res);
>>>         }
>>>     }
>>> }
>>>
>>>
>>> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneumann@sics.se>
>>> wrote:
>>>
>>>> I guess I need to set the parallelism for the FlatMap to 1 to make sure
>>>> I read one file at a time. The downside I see with this is that I will be
>>>> not able to read in parallel from HDFS (and the files are Huge).
>>>>
>>>> I give it a try and see how much performance I loose.
>>>>
>>>> cheers Martin
>>>>
>>>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> Martin,
>>>>>
>>>>> I think you can approximate this in an easy way like this:
>>>>>
>>>>>   - On the client, you traverse your directories to collect all files
>>>>> that you need, collect all file paths in a list.
>>>>>   - Then you have a source "env.fromElements(paths)".
>>>>>   - Then you flatMap and in the FlatMap, run the Avro input format
>>>>> (open it per path, then call it to get all elements)
>>>>>
>>>>> That gives you pretty much full control about in which order the files
>>>>> should be processed.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneumann@sics.se>
>>>>> wrote:
>>>>>
>>>>>> I forgot to mention I'm using an AvroInputFormat to read the file
>>>>>> (that might be relevant how the flag needs to be applied)
>>>>>> See the code Snipped below:
>>>>>>
>>>>>> DataStream<EndSongCleanedPq> inStream =
>>>>>>         env.readFile(new AvroInputFormat<EndSongCleanedPq>(new
Path(filePath), EndSongCleanedPq.class), filePath);
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneumann@sics.se>
>>>>>> wrote:
>>>>>>
>>>>>>> The program is a DataStream program, it usually it gets the data
>>>>>>> from kafka. It's an anomaly detection program that learns from
the stream
>>>>>>> itself. The reason I want to read from files is to test different
settings
>>>>>>> of the algorithm and compare them.
>>>>>>>
>>>>>>> I think I don't need to reply things in the exact order (wich
is not
>>>>>>> possible with parallel reads anyway) and I have written the program
so it
>>>>>>> can deal with out of order events.
>>>>>>> I only need the subfolders to be processed roughly in order.
Its
>>>>>>> fine to process some stuff from 01 before everything from 00
is finished,
>>>>>>> if I get records from all 24 subfolders at the same time things
will break
>>>>>>> though. If I set the flag will it try to get data from all sub
dir's in
>>>>>>> parallel or will it go sub dir by sub dir?
>>>>>>>
>>>>>>> Also can you point me to some documentation or something where
I can
>>>>>>> see how to set the Flag?
>>>>>>>
>>>>>>> cheers Martin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <sewen@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> Going through nested folders is pretty simple, there is a
flag on
>>>>>>>> the FileInputFormat that makes sure those are read.
>>>>>>>>
>>>>>>>> Tricky is the part that all "00" files should be read before
the
>>>>>>>> "01" files. If you still want parallel reads, that means
you need to sync
>>>>>>>> at some point, wait for all parallel parts to finish with
the "00" work
>>>>>>>> before anyone may start with the "01" work.
>>>>>>>>
>>>>>>>> Is your training program a DataStream or a DataSet program?`
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneumann@sics.se>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have a streaming machine learning job that usually
runs with
>>>>>>>>> input from kafka. To tweak the models I need to run on
some old data from
>>>>>>>>> HDFS.
>>>>>>>>>
>>>>>>>>> Unfortunately the data on HDFS is spread out over several
>>>>>>>>> subfolders. Basically I have a datum with one subfolder
for each hour
>>>>>>>>> within those are the actual input files I'm interested
in.
>>>>>>>>>
>>>>>>>>> Basically what I need is a source that goes through the
subfolder
>>>>>>>>> in order and streams the files into the program. I'm
using event timestamps
>>>>>>>>> so all files in 00 need to be processed before 01.
>>>>>>>>>
>>>>>>>>> Has anyone an idea on how to do this?
>>>>>>>>>
>>>>>>>>> cheers Martin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message