flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: streaming hdfs sub folders
Date Fri, 19 Feb 2016 13:39:36 GMT
Hi Martin,

where is the null pointer exception thrown?
I think you didn't call the open() method of the AvroInputFormat. Maybe
that's the issue.

On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <mneumann@sics.se> wrote:

> I tried to implement your idea but I'm getting NullPointer exceptions from
> the AvroInputFormat any Idea what I'm doing wrong?
> See the code below:
>
> public static void main(String[] args) throws Exception {
>
>     // set up the execution environment
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>
>     env.fromElements("00", "01", "02","03","22","23")
>             .flatMap(new FileExtractor())
>             .filter(new LocationFiter())
>             .flatMap(new PreProcessEndSongClean())
>             .writeAsCsv(outPath);
>
>
>     env.execute("something");
> }
>
> private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{
>
>     @Override
>     public void flatMap(String s, Collector<EndSongCleanedPq> collector) throws
Exception {
>         AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new
Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
>         avroInputFormat.setReuseAvroValue(false);
>         while (! avroInputFormat.reachedEnd()){
>                 EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
>                 if (res != null) collector.collect(res);
>         }
>     }
> }
>
>
> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneumann@sics.se> wrote:
>
>> I guess I need to set the parallelism for the FlatMap to 1 to make sure I
>> read one file at a time. The downside I see with this is that I will be not
>> able to read in parallel from HDFS (and the files are Huge).
>>
>> I give it a try and see how much performance I loose.
>>
>> cheers Martin
>>
>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Martin,
>>>
>>> I think you can approximate this in an easy way like this:
>>>
>>>   - On the client, you traverse your directories to collect all files
>>> that you need, collect all file paths in a list.
>>>   - Then you have a source "env.fromElements(paths)".
>>>   - Then you flatMap and in the FlatMap, run the Avro input format (open
>>> it per path, then call it to get all elements)
>>>
>>> That gives you pretty much full control about in which order the files
>>> should be processed.
>>>
>>> What do you think?
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneumann@sics.se>
>>> wrote:
>>>
>>>> I forgot to mention I'm using an AvroInputFormat to read the file (that
>>>> might be relevant how the flag needs to be applied)
>>>> See the code Snipped below:
>>>>
>>>> DataStream<EndSongCleanedPq> inStream =
>>>>         env.readFile(new AvroInputFormat<EndSongCleanedPq>(new Path(filePath),
EndSongCleanedPq.class), filePath);
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneumann@sics.se>
>>>> wrote:
>>>>
>>>>> The program is a DataStream program, it usually it gets the data from
>>>>> kafka. It's an anomaly detection program that learns from the stream
>>>>> itself. The reason I want to read from files is to test different settings
>>>>> of the algorithm and compare them.
>>>>>
>>>>> I think I don't need to reply things in the exact order (wich is not
>>>>> possible with parallel reads anyway) and I have written the program so
it
>>>>> can deal with out of order events.
>>>>> I only need the subfolders to be processed roughly in order. Its fine
>>>>> to process some stuff from 01 before everything from 00 is finished,
if I
>>>>> get records from all 24 subfolders at the same time things will break
>>>>> though. If I set the flag will it try to get data from all sub dir's
in
>>>>> parallel or will it go sub dir by sub dir?
>>>>>
>>>>> Also can you point me to some documentation or something where I can
>>>>> see how to set the Flag?
>>>>>
>>>>> cheers Martin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <sewen@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Going through nested folders is pretty simple, there is a flag on
the
>>>>>> FileInputFormat that makes sure those are read.
>>>>>>
>>>>>> Tricky is the part that all "00" files should be read before the
"01"
>>>>>> files. If you still want parallel reads, that means you need to sync
at
>>>>>> some point, wait for all parallel parts to finish with the "00" work
before
>>>>>> anyone may start with the "01" work.
>>>>>>
>>>>>> Is your training program a DataStream or a DataSet program?`
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneumann@sics.se>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have a streaming machine learning job that usually runs with
input
>>>>>>> from kafka. To tweak the models I need to run on some old data
from HDFS.
>>>>>>>
>>>>>>> Unfortunately the data on HDFS is spread out over several
>>>>>>> subfolders. Basically I have a datum with one subfolder for each
hour
>>>>>>> within those are the actual input files I'm interested in.
>>>>>>>
>>>>>>> Basically what I need is a source that goes through the subfolder
in
>>>>>>> order and streams the files into the program. I'm using event
timestamps so
>>>>>>> all files in 00 need to be processed before 01.
>>>>>>>
>>>>>>> Has anyone an idea on how to do this?
>>>>>>>
>>>>>>> cheers Martin
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message