flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: How best to deal with wide, structured tuples?
Date Mon, 09 Nov 2015 12:10:12 GMT
Hi,
yes please, open an Issue for that. I think the method would have to be added to TableEnvironment.

Aljoscha
> On 09 Nov 2015, at 12:19, Johann Kovacs <me@jkovacs.de> wrote:
> 
> Hi,
> thanks for having a look at this, Aljoscha.
> 
> Not being able to read a DataSet[Row] from csv is definitively the most major issue for
me right now. 
> Everything else I could work around with Scala magic. I can create an issue for this
if you'd like.
> 
> Regarding the other points:
> 1. Oh absolutely, that's really the most important reason I implemented those classes
in the first place, 
> although I also liked being able to deserialize the schema from json/yaml, before converting
it to a RowTypeInfo.
> 2. I have no strong opinion either way regarding serializability of TypeInformations.
But having the field name <-> index mapping available
> in the UDF would be very very nice to have, so that I can access fields by name instead
of by index.
> If you decide to make TypeInformation non-serializable, maybe having a dedicated Schema
class for this purpose isn't such a bad idea after all.
> 3. Fair enough, I guess I was too excited with Scala implicit magic when I wrote the
prototype :-) 
> I wouldn't want to include the Schema with each Row either. I guess non-implicit utility
functions for that would work just as well, or the user can
>  work around this by themselves if the field name <-> index mapping is available
in the UDF (see point 2)
> 
> Thanks,
> Johann
> 
> 
> On 5 November 2015 at 13:59, Aljoscha Krettek <aljoscha@apache.org> wrote:
> Hi,
> these are some interesting Ideas.
> 
> I have some thoughts, though, about the current implementation.
>  1. With Schema and Field you are basically re-implementing RowTypeInfo, so it should
not be required. Maybe just an easier way to create a RowTypeInfo.
>  2. Right now, in Flink the TypeInformation is technically not meant to be Serializable
and be shipped to runtime operations. (Although practically it is in places, there is an ongoing
discussion about this.)
>  3. Having the Schema as an implicit parameter does not work in the general case because
the compiler does not know which Schema to take if there are several Schemas around. Maybe
the Row would have to be extended to contain the Schema. But this could have performance implications.
> 
> We should definitely add support for creating a DataSet[Row] directly from the a CSV-Input,
since otherwise you have to go trough tuples which does not work
> with dynamic schemas and if you have more than a certain amount of fields.
> 
> Cheers,
> Aljoscha
> > On 02 Nov 2015, at 17:41, Johann Kovacs <me@jkovacs.de> wrote:
> >
> > Hi,
> > thanks once again for those pointers. I did a bit of experimenting the past couple
of days and came to the following conclusions:
> > 1. Unfortunately, I don't think I can get away with option 1 (generating POJOs on
runtime). At least not without generating lots of boiler plate code, because I'd like to be
able to access fields, also inside UDFs, by index or name. Also instantiating generated classes
in UDFs seems like a pain.
> > 2. Table API looks nice, but doesn't seem to solve the problem of getting the data
into a DataSet in the first place. And I'll likely have to revert to the DataSet API to do
low level operations, such as a simple map(), as far as I can tell.
> >
> > However, it seems like the table api already provides exactly Fabian's option 3
with its Row, RowTypeInfo, and RowSerializer implementations, if I'm not mistaken, am I?
> > I played around a bit, trying to use a DataSet[Row] and it seems to work great.
I had to add a hack to make the ScalaCsvInputFormat read Rows instead of Tuples, as well as
add a few convenience methods to the Row class and a Schema API to quickly generate RowTypeInfos.
I pushed my experiments here: https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df
> > A toy example of how I'd like to use the APIs is included in the TableExperiments.scala
file.
> >
> > Is this a maintainable solution, or is the Row api supposed to be internal only?
If it's supposed to be used outside of the Table api, do you think we can add the Row and
Schema convenience layer like I started to implement to the core flink-table? Would make it
much easier to work with with the regular DataSet API.
> >
> > Thanks,
> > Johann
> >
> > On 30 October 2015 at 03:34, Stephan Ewen <sewen@apache.org> wrote:
> > Hi Johann!
> >
> > You can try and use the Table API, it has logical tuples that you program with,
rather than tuple classes.
> >
> > Have a look here: https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html
> >
> > Stephan
> >
> >
> > On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <fhueske@gmail.com> wrote:
> > Hi Johann,
> >
> > I see three options for your use case.
> >
> > 1) Generate Pojo code at planning time, i.e., when the program is composed. This
does not work when the program is already running. The benefit is that you can use key expressions,
have typed fields, and type specific serializers and comparators.
> >
> > 2) Use Record, an Object[], or List<Object> (or some own holder with a few
convenience methods) to store the data untyped and work with KeySelectors to extract grouping
and join keys. The drawbacks of this approach are generic serializers (Kryo) which won't as
efficient as the native ones. If you know the key types and don't use generic types for keys,
sorting and joining should be still fast.
> >
> > 3) A hybrid approach of both, which works without code generation. Use a generic
holder, e.g., Object[] for your data records but implement you own type information, serializers
and comparators. After each operation, you can define the type information of the result using
the returns() method, e.g.;
> > myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo).
This approach requires a good understanding of Flink's type system, but if done correctly,
you can also use expressions or positions to define keys and benefit from efficient serialization
and binary comparisons. However, similar to the first approach, you need to know the schema
of the data in advance (before the program is executed).
> >
> > In my opinion the first approach is the better, but as you said it is more effort
to implement and might not work depending on what information is available at which point
in time.
> >
> > Let me know if you have any questions.
> >
> > Cheers, Fabian
> >
> > 2015-10-28 20:01 GMT+01:00 Johann Kovacs <me@jkovacs.de>:
> > Hi all,
> >
> > I currently find myself evaluating a use case, where I have to deal
> > with wide (i.e. about 50-60 columns, definitely more than the 25
> > supported by the Tuple types), structured data from CSV files, with a
> > potentially dynamically (during runtime) generated (or automatically
> > inferred from the CSV file) schema.
> > SparkSQL works very well for this case, because I can generate or
> > infer the schema dynamically at runtime, access fields in UDFs via
> > index or name (via the Row API), generate new schemata for UDF results
> > on the fly, and use those schemata to read and write from/to CSV.
> > Obviously Spark and SparkSQL have other quirks and I'd like to find a
> > good solution to do this with Flink.
> >
> > The main limitation seems to be that I can't seem to have DataSets of
> > arbitrary-length, arbitrary-type (i.e. unknown during compile time),
> > tuples. The Record API/type looks like it was meant to provide
> > something like that but it appears to become deprecated and is not
> > well supported by the DataSet APIs (e.g. I can't do a join on Records
> > by field index, nor does the CsvReader API support Records), and it
> > has no concept of field names, either.
> >
> > I though about generating Java classes of my schemata on runtime (e.g.
> > via Javassist), but that seems like a hack, and I'd probably have to
> > do this for each intermediate schema as well (e.g. when a map
> > operation alters the schema). I haven't tried this avenue yet, so I'm
> > not certain it would actually work, and even less certain that this is
> > a nice and maintainable solution
> >
> > Can anyone suggest a nice way to deal with this kind of use case? I
> > can prepare an example if that would make it more clear.
> >
> > Thanks,
> > Johann
> >
> >
> >
> 


Mime
View raw message