flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: How to make Flink read all files in HDFS folder and do transformations on th e data
Date Sat, 07 May 2016 13:58:28 GMT
Sorry Palle,
I wrongly understood that you were trying to read a single json object per
file...the solution suggested by Fabian is definitely the right solution
for your specific use case!

On 7 May 2016 12:52, "Fabian Hueske" <fhueske@gmail.com> wrote:

> Hi Palle,
> you can recursively read all files in a folder as explained in the
> "Recursive Traversal of the Input Path Directory" section of the Data
> Source documentation [1].
> The easiest way to read line-wise JSON objects is to use
> ExecutionEnvironment.readTextFile() which reads text files linewise as
> strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to
> parse the JSON strings. You should use a RichMapFunction and create the
> parser in the open() method to avoid instantiating a new parser for each
> incoming line. After parsing, the RichMapFunction can emit POJOs.
> Cheers, Fabian
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources
> 2016-05-07 12:25 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>> I had the same issue :)
>> I resolved it reading all file paths in a collection, then using this
>> code:
>> env.fromCollection(filePaths).rebalance().map(file2pojo)
>> You can have your dataset of Pojos!
>> The rebalance() is necessary to exploit parallelism,otherwise the
>> pipeline will be executed with parallelism 1.
>> Best,
>> Flavio
>> On 7 May 2016 12:13, "Palle" <palle@sport.dk> wrote:
>> Hi there.
>> I've got a HDFS folder containing a lot of files. All files contains a
>> lot of JSON objects, one for each line. I will have several TB in the HDFS
>> folder.
>> My plan is to make Flink read all files and all JSON objects and then do
>> some analysis on the data, actually very similar to the
>> flatMap/groupBy/reduceGroup transformations that is done in the WordCount
>> example.
>> But I am a bit stuck, because I cannot seem to find out how to make Flink
>> read all files in a HDFS dir and then perform the transformations on the
>> data. I have googled quite a bit and also looked in the Flink API and mail
>> history.
>> Can anyone point me to an example where Flink is used to read all files
>> in a HDFS folder and then do transformations on the data)?
>> - and a second question: Is there an elegant way to make Flink handle the
>> JSON objects? - can they be converted to POJOs by something similar to the
>> pojoType() method?
>> /Palle

View raw message