Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 49C2C2009F7 for ; Sat, 7 May 2016 15:58:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 484A4160A01; Sat, 7 May 2016 13:58:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 65D011609F6 for ; Sat, 7 May 2016 15:58:35 +0200 (CEST) Received: (qmail 14198 invoked by uid 500); 7 May 2016 13:58:34 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 14188 invoked by uid 99); 7 May 2016 13:58:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 May 2016 13:58:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 18FC01A09BC for ; Sat, 7 May 2016 13:58:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=okkam-it.20150623.gappssmtp.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id UKYh7hk3Dtbm for ; Sat, 7 May 2016 13:58:30 +0000 (UTC) Received: from mail-wm0-f53.google.com (mail-wm0-f53.google.com [74.125.82.53]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 5324A5F365 for ; Sat, 7 May 2016 13:58:29 +0000 (UTC) Received: by mail-wm0-f53.google.com with SMTP id g17so117880938wme.1 for ; Sat, 07 May 2016 06:58:29 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=okkam-it.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to; bh=nh0v8sfAACVMQrxyMOMbFXiz590kt9CF/0NOTmC2Qq4=; b=MorvzQsmwJStyXHdQqfx5zJP475t6gRzLxzbUPzZJvnysN6UouArhzzmIkRbFJkCxF /c4gyyDmpHm+StXHsjvXVAUq616PtHCcDq55/cwc20BEdI3NqamF9R2KDkcPUA5/atV1 NmARiv/YinCfYQcVlG9VwdFBKFY6RR9KT1Yp+f4T2EVDE2/oziMNnZSgMVD5PJUysWMv FfqAFIGpM9v9nYHg9b7jCs3O1s8KKj5GoC4qifKnNc24LUq8Q2eI8QColMWVCBjhrfA5 UDKwS95Ey+yM5KHOsRQs+TQvm3RYxAam/Einqsb+6X9QhGJ6dGNmMxgP11oefrb7IgnG eJ2g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=nh0v8sfAACVMQrxyMOMbFXiz590kt9CF/0NOTmC2Qq4=; b=f58HuxO5TbWrAIsXGRYFEs4sztHa9hi2wjZ6VrmGQEPd9lZQoghqeZzaHDOV7yVA5K c75/2F8iW5HwzY6QXAq2LWvceKrvqC0nPpmxY6fKSM8GPHOUy4Fdr8Yin9V1OKenR8uu JZbyzMGlrL2LMsg7O7e0g0wvFclzDyqzpnCqGidOtVVGRRPup7FggiDBkn3X+m4YT2JS 6nfB+vvM6kikzJ96DDJl3HQAAJfio80hN5x6s+MZqPNERSH7C344Ehph1Xm0DmMWLtKd 7aEz7j9+EISptKJNdfNyYTjGOV+aBRGUMzM9qj0R9VY05EgVG5tReoQtbvJSfJwSP7oB YyLQ== X-Gm-Message-State: AOPr4FU5wWc2q6jGMzuImJmnDQZyxGkmOiBJ8hZE0fK32IirgW8IKoLBkUR5oty6lbmTlQWhX5tU3xNlxmtFAA== MIME-Version: 1.0 X-Received: by 10.194.234.167 with SMTP id uf7mr24971982wjc.122.1462629508812; Sat, 07 May 2016 06:58:28 -0700 (PDT) Received: by 10.28.186.198 with HTTP; Sat, 7 May 2016 06:58:28 -0700 (PDT) X-Originating-IP: [87.2.111.124] Received: by 10.28.186.198 with HTTP; Sat, 7 May 2016 06:58:28 -0700 (PDT) In-Reply-To: References: <4A.6E.19519.7AFBD275@fep46> Date: Sat, 7 May 2016 15:58:28 +0200 Message-ID: Subject: Re: How to make Flink read all files in HDFS folder and do transformations on th e data From: Flavio Pompermaier To: user Content-Type: multipart/alternative; boundary=089e01177b294fa650053240f830 archived-at: Sat, 07 May 2016 13:58:36 -0000 --089e01177b294fa650053240f830 Content-Type: text/plain; charset=UTF-8 Sorry Palle, I wrongly understood that you were trying to read a single json object per file...the solution suggested by Fabian is definitely the right solution for your specific use case! Best, Flavio On 7 May 2016 12:52, "Fabian Hueske" wrote: > Hi Palle, > > you can recursively read all files in a folder as explained in the > "Recursive Traversal of the Input Path Directory" section of the Data > Source documentation [1]. > > The easiest way to read line-wise JSON objects is to use > ExecutionEnvironment.readTextFile() which reads text files linewise as > strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to > parse the JSON strings. You should use a RichMapFunction and create the > parser in the open() method to avoid instantiating a new parser for each > incoming line. After parsing, the RichMapFunction can emit POJOs. > > Cheers, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources > > 2016-05-07 12:25 GMT+02:00 Flavio Pompermaier : > >> I had the same issue :) >> I resolved it reading all file paths in a collection, then using this >> code: >> >> env.fromCollection(filePaths).rebalance().map(file2pojo) >> >> You can have your dataset of Pojos! >> >> The rebalance() is necessary to exploit parallelism,otherwise the >> pipeline will be executed with parallelism 1. >> >> Best, >> Flavio >> On 7 May 2016 12:13, "Palle" wrote: >> >> Hi there. >> >> I've got a HDFS folder containing a lot of files. All files contains a >> lot of JSON objects, one for each line. I will have several TB in the HDFS >> folder. >> >> My plan is to make Flink read all files and all JSON objects and then do >> some analysis on the data, actually very similar to the >> flatMap/groupBy/reduceGroup transformations that is done in the WordCount >> example. >> >> But I am a bit stuck, because I cannot seem to find out how to make Flink >> read all files in a HDFS dir and then perform the transformations on the >> data. I have googled quite a bit and also looked in the Flink API and mail >> history. >> >> Can anyone point me to an example where Flink is used to read all files >> in a HDFS folder and then do transformations on the data)? >> >> - and a second question: Is there an elegant way to make Flink handle the >> JSON objects? - can they be converted to POJOs by something similar to the >> pojoType() method? >> >> /Palle >> >> > --089e01177b294fa650053240f830 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Sorry Palle,
I wrongly understood that you were trying to read a single json object per = file...the solution suggested by Fabian is definitely the right solution fo= r your specific use case!

Best,
Flavio

On 7 May 2016 12:52, "Fabian Hueske" &= lt;fhueske@gmail.com> wrote:
= Hi Palle,

you can recursively read all files in a = folder as explained in the "Recursive Traversal of the Input Path Dire= ctory" section=C2=A0of the Data Source documentation [1].
The easiest way to read line-wise JSON objects is to use Execu= tionEnvironment.readTextFile() which reads text files linewise as strings a= nd a subsequent mapper that uses a JSON parser (e.g., Jackson) to parse the= JSON strings. You should use a RichMapFunction and create the parser in th= e open() method to avoid=C2=A0instantiating a=C2=A0new parser for each inco= ming line. After parsing, the RichMapFunction can emit POJOs.
Cheers, Fabian


2016-05-07 12:25 GMT+02:00= Flavio Pompermaier <pompermaier@okkam.it>:

I had the same issue :)
I resolved it reading all file paths in a collection, then using this code:=

env.fromCollection(filePaths).rebalance().map(file2pojo)

You can have your dataset of Pojos!

The rebalance() is necessary to exploit parallelism,otherwis= e the pipeline will be executed with parallelism 1.

Best,
Flavio

On 7 May 2016 12:13, "Palle" <palle@sport.dk> wrote= :
Hi there.

I've got a HDFS folder containing a lot of files. All files contains a = lot of JSON objects, one for each line. I will have several TB in the HDFS = folder.

My plan is to make Flink read all files and all JSON objects and then do so= me analysis on the data, actually very similar to the flatMap/groupBy/reduc= eGroup transformations that is done in the WordCount example.

But I am a bit stuck, because I cannot seem to find out how to make Flink r= ead all files in a HDFS dir and then perform the transformations on the dat= a. I have googled quite a bit and also looked in the Flink API and mail his= tory.

Can anyone point me to an example where Flink is used to read all files in = a HDFS folder and then do transformations on the data)?

- and a second question: Is there an elegant way to make Flink handle the J= SON objects? - can they be converted to POJOs by something similar to the p= ojoType() method?

/Palle

--089e01177b294fa650053240f830--