flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Baghino <stefano.bagh...@radicalbit.io>
Subject Re: Read JSON file as input
Date Fri, 29 Apr 2016 07:33:08 GMT
Great, Punit, I'm glad I've been of some help.
If you have similar issues, feel free to write to the user mailing list, I
believe you'll find help more easily there as you approach Flink.
Happy hacking! :)

On Thu, Apr 28, 2016 at 9:10 PM, Punit Naik <naik.punit44@gmail.com> wrote:

> I am so sorry. Please ignore my previous reply. Actually my input was too
> big so it hung. So stupid of me. Thanks a lot! Your example worked!
>
> On Fri, Apr 29, 2016 at 12:35 AM, Punit Naik <naik.punit44@gmail.com>
> wrote:
>
> > I tried exactly what you told me. But when I execute this code, first of
> > all it gives me a warning saying "Type Any has no fields that are visible
> > from Scala Type analysis. Falling back to Java Type Analysis
> > (TypeExtractor)." in eclipse, and when I run it, the code just hangs and
> > does not print a thing.
> >
> > On Thu, Apr 28, 2016 at 7:11 PM, Stefano Baghino <
> > stefano.baghino@radicalbit.io> wrote:
> >
> >> Hi Punit,
> >>
> >> what you want to do is something like this:
> >>
> >>     val env = ExecutionEnvironment.getExecutionEnvironment
> >>     env.
> >>       readTextFile("path/to/test.json").
> >>       flatMap(line => JSON.parseFull(line)).
> >>       print
> >>
> >> The JSON.parseFull function in the Scala standard library takes a string
> >> (a
> >> line coming from the text file you read) and outputs an Option[Any],
> >> meaning it will output an object that represents the possibility of a
> >> missing output (Option) wrapping Any, which has been (somewhat
> >> confusingly)
> >> chosen to represent the actual parsed value (if present). If you "just"
> >> mapped over the input you would've ended up with a DataSet[Option[Any]],
> >> whereas your objective is to extract that inner type. FlatMap does just
> >> that for you.
> >>
> >> If you execute the code I've shown (with the correct path in the right
> >> place) you'll see it'll print the same JSON in input, but in its Scala
> Map
> >> representation. For more information on how to access data parsed by the
> >> Scala standard library JSON parser, unfortunately I can't help you as
> I'm
> >> not very familiar with it, but I'm pretty sure it's pretty well
> >> documented.
> >>
> >> Hacking around with Flink is very fun, but before you move further I'd
> >> like
> >> to point you to the excellent programming guide in the official
> >> documentation [1]. I'm sure you'll find the reading very interesting and
> >> worthwhile.
> >>
> >> [1]:
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html
> >>
> >> On Thu, Apr 28, 2016 at 12:44 PM, Punit Naik <naik.punit44@gmail.com>
> >> wrote:
> >>
> >> > I had one more request though. I have been struggling with JSONs and
> >> Flink
> >> > for the past two days since I started using it. I have a JSON file
> which
> >> > has one JSON object per line and I want to read it and store it as
> maps
> >> in
> >> > another flink Dataset. In my JSON the values might be anything, for
> e.g.
> >> > int, double, map, array etc. I have attached a small two line input
> file
> >> > and I request you to please implement the logic that I have explained
> >> above
> >> > using flink. It would be a great help.
> >> >
> >> > On Thu, Apr 28, 2016 at 4:04 PM, Punit Naik <naik.punit44@gmail.com>
> >> > wrote:
> >> >
> >> >> I managed to fix this error. I basically had to do val j=data.map {
x
> >> =>
> >> >> (x.replaceAll("\"","\\\"")) } instead of val j=data.map { x =>
> >> ("\"\"\""+
> >> >> x+"\"\"\"") }
> >> >>
> >> >> On Wed, Apr 27, 2016 at 4:05 PM, Punit Naik <naik.punit44@gmail.com>
> >> >> wrote:
> >> >>
> >> >>> I have my Apache Flink program:
> >> >>>
> >> >>> import org.apache.flink.api.scala._import scala.util.parsing.json._
> >> >>> object numHits extends App {
> >> >>>     val env = ExecutionEnvironment.getExecutionEnvironment
> >> >>>     val data=env.readTextFile("file:///path/to/json/file")
> >> >>>     val j=data.map { x => ("\"\"\""+x+"\"\"\"") }
> >> >>>     /*1*/ println( ((j.first(1).collect())(0)).getClass() )
> >> >>>
> >> >>>     /*2*/ println( ((j.first(1).collect())(0)) )
> >> >>>
> >> >>>     /*3*/ println( JSON.parseFull((j.first(1).collect())(0)) )
> >> >>>     }
> >> >>>
> >> >>> I want to parse the input JSON file into normal scala Map and for
> >> that I
> >> >>> am using the default scala.util.parsing.json._ library.
> >> >>>
> >> >>> The output of the first println statement is class java.lang.String
> >> >>> which is required by the JSON parsing function.
> >> >>>
> >> >>> Output of the second println function is the actual JSON string
> >> >>> appended and prepended by "\"\"\"" which is also required by the
> JSON
> >> >>> parser.
> >> >>>
> >> >>> Now at this point if I copy the output of the second println command
> >> >>> printed in the console and pass it to the JSON.parseFull() function,
> >> it
> >> >>> properly parses it.
> >> >>>
> >> >>> Therefore the third println function should properly parse the
same
> >> >>> string passed to it but it does not as it outputs a "None" string
> >> which
> >> >>> means it failed.
> >> >>>
> >> >>> Why does this happen and how can I make it work?
> >> >>>
> >> >>> On Wed, Apr 27, 2016 at 12:41 PM, Punit Naik <
> naik.punit44@gmail.com>
> >> >>> wrote:
> >> >>>
> >> >>>> I just tried it and it still cannot parse it. It still takes
the
> >> input
> >> >>>> as a dataset object rather than a string.
> >> >>>>
> >> >>>> On Wed, Apr 27, 2016 at 12:36 PM, Punit Naik <
> naik.punit44@gmail.com
> >> >
> >> >>>> wrote:
> >> >>>>
> >> >>>>> Okay Thanks a lot Fabian!
> >> >>>>>
> >> >>>>> On Wed, Apr 27, 2016 at 12:34 PM, Fabian Hueske <
> fhueske@gmail.com>
> >> >>>>> wrote:
> >> >>>>>
> >> >>>>>> You should do the parsing in a Map operator. Map applies
the
> >> >>>>>> MapFunction to
> >> >>>>>> each element in the DataSet.
> >> >>>>>> So you can either implement another MapFunction or
extend the one
> >> you
> >> >>>>>> have
> >> >>>>>> to call the JSON parser.
> >> >>>>>>
> >> >>>>>> 2016-04-27 6:40 GMT+02:00 Punit Naik <naik.punit44@gmail.com>:
> >> >>>>>>
> >> >>>>>> > Hi
> >> >>>>>> >
> >> >>>>>> > So I managed to do the map part. I stuc with the
"import
> >> >>>>>> > scala.util.parsing.json._" library for parsing.
> >> >>>>>> >
> >> >>>>>> > First I read my JSON:
> >> >>>>>> >
> >> >>>>>> > val data=env.readTextFile("file:///home/punit/vik-in")
> >> >>>>>> >
> >> >>>>>> > Then I transformed it so that it can be parsed
to a map:
> >> >>>>>> >
> >> >>>>>> > val j=data.map { x => ("\"\"\"").+(x).+("\"\"\"")
}
> >> >>>>>> >
> >> >>>>>> >
> >> >>>>>> > I check it by printing "j"s 1st value and its
proper.
> >> >>>>>> >
> >> >>>>>> > But when I tried to parse "j" like this:
> >> >>>>>> >
> >> >>>>>> > JSON.parseFull(j.first(1)) ; it did not parse
because the
> object
> >> >>>>>> > "j.first(1)" is still a Dataset object and not
a String object.
> >> >>>>>> >
> >> >>>>>> > So how can I get the underlying java object from
the dataset
> >> object?
> >> >>>>>> >
> >> >>>>>> > On Tue, Apr 26, 2016 at 3:32 PM, Fabian Hueske
<
> >> fhueske@gmail.com>
> >> >>>>>> wrote:
> >> >>>>>> >
> >> >>>>>> > > Hi,
> >> >>>>>> > >
> >> >>>>>> > > you need to implement the MapFunction interface
[1].
> >> >>>>>> > > Inside the MapFunction you can use any JSON
parser library
> >> such as
> >> >>>>>> > Jackson
> >> >>>>>> > > to parse the String.
> >> >>>>>> > > The exact logic depends on your use case.
> >> >>>>>> > >
> >> >>>>>> > > However, you should be careful to not initialize
a new parser
> >> in
> >> >>>>>> each
> >> >>>>>> > map()
> >> >>>>>> > > call, because that would be quite expensive.
> >> >>>>>> > > I recommend to extend the RichMapFunction
and instantiate a
> >> >>>>>> parser in the
> >> >>>>>> > > open() method.
> >> >>>>>> > >
> >> >>>>>> > > Best, Fabian
> >> >>>>>> > >
> >> >>>>>> > > [1]
> >> >>>>>> > >
> >> >>>>>> > >
> >> >>>>>> >
> >> >>>>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#map
> >> >>>>>> > > [2]
> >> >>>>>> > >
> >> >>>>>> > >
> >> >>>>>> >
> >> >>>>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-transformation-functions
> >> >>>>>> > >
> >> >>>>>> > > 2016-04-26 10:44 GMT+02:00 Punit Naik <
> naik.punit44@gmail.com
> >> >:
> >> >>>>>> > >
> >> >>>>>> > > > Hi Fabian
> >> >>>>>> > > >
> >> >>>>>> > > > Thanks for the reply. Yes my json is
separated by new
> lines.
> >> It
> >> >>>>>> would
> >> >>>>>> > > have
> >> >>>>>> > > > been great if you had explained the
function that goes
> inside
> >> >>>>>> the map.
> >> >>>>>> > I
> >> >>>>>> > > > tried to use the 'scala.util.parsing.json._'
library but
> got
> >> no
> >> >>>>>> luck.
> >> >>>>>> > > >
> >> >>>>>> > > > On Tue, Apr 26, 2016 at 1:11 PM, Fabian
Hueske <
> >> >>>>>> fhueske@gmail.com>
> >> >>>>>> > > wrote:
> >> >>>>>> > > >
> >> >>>>>> > > > > Hi Punit,
> >> >>>>>> > > > >
> >> >>>>>> > > > > JSON can be hard to parse in parallel
due to its nested
> >> >>>>>> structure. It
> >> >>>>>> > > > > depends on the schema and (textual)
representation of the
> >> JSON
> >> >>>>>> > whether
> >> >>>>>> > > > and
> >> >>>>>> > > > > how it can be done. The problem
is that a parallel input
> >> >>>>>> format needs
> >> >>>>>> > > to
> >> >>>>>> > > > be
> >> >>>>>> > > > > able to identify record boundaries
without context
> >> >>>>>> information. This
> >> >>>>>> > > can
> >> >>>>>> > > > be
> >> >>>>>> > > > > very easy, if your JSON data is
a list of JSON objects
> >> which
> >> >>>>>> are
> >> >>>>>> > > > separated
> >> >>>>>> > > > > by a new line character. However,
this is hard to
> >> generalize.
> >> >>>>>> That's
> >> >>>>>> > > why
> >> >>>>>> > > > > Flink does not offer tooling for
it (yet).
> >> >>>>>> > > > >
> >> >>>>>> > > > > If your JSON objects are separated
by new line
> characters,
> >> the
> >> >>>>>> > easiest
> >> >>>>>> > > > way
> >> >>>>>> > > > > is to read it as text file, where
each line results in a
> >> >>>>>> String and
> >> >>>>>> > > parse
> >> >>>>>> > > > > each object using a standard JSON
parser. This would look
> >> >>>>>> like:
> >> >>>>>> > > > >
> >> >>>>>> > > > > ExecutionEnvironment env =
> >> >>>>>> > > > ExecutionEnvironment.getExecutionEnvironment();
> >> >>>>>> > > > >
> >> >>>>>> > > > > DataSet<String> text =
> >> env.readTextFile("/path/to/jsonfile");
> >> >>>>>> > > > > DataSet<YourObject> json
= text.map(new
> >> >>>>>> > > > YourMapFunctionWhichParsesJSON());
> >> >>>>>> > > > >
> >> >>>>>> > > > > Best, Fabian
> >> >>>>>> > > > >
> >> >>>>>> > > > > 2016-04-26 8:06 GMT+02:00 Punit
Naik <
> >> naik.punit44@gmail.com
> >> >>>>>> >:
> >> >>>>>> > > > >
> >> >>>>>> > > > > > Hi
> >> >>>>>> > > > > >
> >> >>>>>> > > > > > I am new to Flink. I was experimenting
with the Dataset
> >> API
> >> >>>>>> and
> >> >>>>>> > found
> >> >>>>>> > > > out
> >> >>>>>> > > > > > that there is no explicit
method for loading a JSON
> file
> >> as
> >> >>>>>> input.
> >> >>>>>> > > Can
> >> >>>>>> > > > > > anyone please suggest me a
workaround?
> >> >>>>>> > > > > >
> >> >>>>>> > > > > > --
> >> >>>>>> > > > > > Thank You
> >> >>>>>> > > > > >
> >> >>>>>> > > > > > Regards
> >> >>>>>> > > > > >
> >> >>>>>> > > > > > Punit Naik
> >> >>>>>> > > > > >
> >> >>>>>> > > > >
> >> >>>>>> > > >
> >> >>>>>> > > >
> >> >>>>>> > > >
> >> >>>>>> > > > --
> >> >>>>>> > > > Thank You
> >> >>>>>> > > >
> >> >>>>>> > > > Regards
> >> >>>>>> > > >
> >> >>>>>> > > > Punit Naik
> >> >>>>>> > > >
> >> >>>>>> > >
> >> >>>>>> >
> >> >>>>>> >
> >> >>>>>> >
> >> >>>>>> > --
> >> >>>>>> > Thank You
> >> >>>>>> >
> >> >>>>>> > Regards
> >> >>>>>> >
> >> >>>>>> > Punit Naik
> >> >>>>>> >
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> --
> >> >>>>> Thank You
> >> >>>>>
> >> >>>>> Regards
> >> >>>>>
> >> >>>>> Punit Naik
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> --
> >> >>>> Thank You
> >> >>>>
> >> >>>> Regards
> >> >>>>
> >> >>>> Punit Naik
> >> >>>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> Thank You
> >> >>>
> >> >>> Regards
> >> >>>
> >> >>> Punit Naik
> >> >>>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Thank You
> >> >>
> >> >> Regards
> >> >>
> >> >> Punit Naik
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Thank You
> >> >
> >> > Regards
> >> >
> >> > Punit Naik
> >> >
> >>
> >>
> >>
> >> --
> >> BR,
> >> Stefano Baghino
> >>
> >> Software Engineer @ Radicalbit
> >>
> >
> >
> >
> > --
> > Thank You
> >
> > Regards
> >
> > Punit Naik
> >
>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message