flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Neumann <mneum...@sics.se>
Subject Re: streaming hdfs sub folders
Date Tue, 23 Feb 2016 11:04:51 GMT
I'm not very familiar with the inner workings of the InputFomat's. calling
.open() got rid of the Nullpointer but the stream still produces no output.

As a temporary solution I wrote a batch job that just unions all the
different datasets and puts them (sorted) into a single folder.

cheers Martin

On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger <rmetzger@apache.org> wrote:

> Hi Martin,
>
> where is the null pointer exception thrown?
> I think you didn't call the open() method of the AvroInputFormat. Maybe
> that's the issue.
>
> On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <mneumann@sics.se> wrote:
>
>> I tried to implement your idea but I'm getting NullPointer exceptions
>> from the AvroInputFormat any Idea what I'm doing wrong?
>> See the code below:
>>
>> public static void main(String[] args) throws Exception {
>>
>>     // set up the execution environment
>>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.setParallelism(1);
>>
>>     env.fromElements("00", "01", "02","03","22","23")
>>             .flatMap(new FileExtractor())
>>             .filter(new LocationFiter())
>>             .flatMap(new PreProcessEndSongClean())
>>             .writeAsCsv(outPath);
>>
>>
>>     env.execute("something");
>> }
>>
>> private static class FileExtractor implements FlatMapFunction<String,EndSongCleanedPq>{
>>
>>     @Override
>>     public void flatMap(String s, Collector<EndSongCleanedPq> collector) throws
Exception {
>>         AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new
Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
>>         avroInputFormat.setReuseAvroValue(false);
>>         while (! avroInputFormat.reachedEnd()){
>>                 EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq());
>>                 if (res != null) collector.collect(res);
>>         }
>>     }
>> }
>>
>>
>> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneumann@sics.se> wrote:
>>
>>> I guess I need to set the parallelism for the FlatMap to 1 to make sure
>>> I read one file at a time. The downside I see with this is that I will be
>>> not able to read in parallel from HDFS (and the files are Huge).
>>>
>>> I give it a try and see how much performance I loose.
>>>
>>> cheers Martin
>>>
>>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Martin,
>>>>
>>>> I think you can approximate this in an easy way like this:
>>>>
>>>>   - On the client, you traverse your directories to collect all files
>>>> that you need, collect all file paths in a list.
>>>>   - Then you have a source "env.fromElements(paths)".
>>>>   - Then you flatMap and in the FlatMap, run the Avro input format
>>>> (open it per path, then call it to get all elements)
>>>>
>>>> That gives you pretty much full control about in which order the files
>>>> should be processed.
>>>>
>>>> What do you think?
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneumann@sics.se>
>>>> wrote:
>>>>
>>>>> I forgot to mention I'm using an AvroInputFormat to read the file
>>>>> (that might be relevant how the flag needs to be applied)
>>>>> See the code Snipped below:
>>>>>
>>>>> DataStream<EndSongCleanedPq> inStream =
>>>>>         env.readFile(new AvroInputFormat<EndSongCleanedPq>(new
Path(filePath), EndSongCleanedPq.class), filePath);
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneumann@sics.se>
>>>>> wrote:
>>>>>
>>>>>> The program is a DataStream program, it usually it gets the data
from
>>>>>> kafka. It's an anomaly detection program that learns from the stream
>>>>>> itself. The reason I want to read from files is to test different
settings
>>>>>> of the algorithm and compare them.
>>>>>>
>>>>>> I think I don't need to reply things in the exact order (wich is
not
>>>>>> possible with parallel reads anyway) and I have written the program
so it
>>>>>> can deal with out of order events.
>>>>>> I only need the subfolders to be processed roughly in order. Its
fine
>>>>>> to process some stuff from 01 before everything from 00 is finished,
if I
>>>>>> get records from all 24 subfolders at the same time things will break
>>>>>> though. If I set the flag will it try to get data from all sub dir's
in
>>>>>> parallel or will it go sub dir by sub dir?
>>>>>>
>>>>>> Also can you point me to some documentation or something where I
can
>>>>>> see how to set the Flag?
>>>>>>
>>>>>> cheers Martin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> Going through nested folders is pretty simple, there is a flag
on
>>>>>>> the FileInputFormat that makes sure those are read.
>>>>>>>
>>>>>>> Tricky is the part that all "00" files should be read before
the
>>>>>>> "01" files. If you still want parallel reads, that means you
need to sync
>>>>>>> at some point, wait for all parallel parts to finish with the
"00" work
>>>>>>> before anyone may start with the "01" work.
>>>>>>>
>>>>>>> Is your training program a DataStream or a DataSet program?`
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneumann@sics.se>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I have a streaming machine learning job that usually runs
with
>>>>>>>> input from kafka. To tweak the models I need to run on some
old data from
>>>>>>>> HDFS.
>>>>>>>>
>>>>>>>> Unfortunately the data on HDFS is spread out over several
>>>>>>>> subfolders. Basically I have a datum with one subfolder for
each hour
>>>>>>>> within those are the actual input files I'm interested in.
>>>>>>>>
>>>>>>>> Basically what I need is a source that goes through the subfolder
>>>>>>>> in order and streams the files into the program. I'm using
event timestamps
>>>>>>>> so all files in 00 need to be processed before 01.
>>>>>>>>
>>>>>>>> Has anyone an idea on how to do this?
>>>>>>>>
>>>>>>>> cheers Martin
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message