Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EEF2D18090 for ; Mon, 9 Nov 2015 12:10:15 +0000 (UTC) Received: (qmail 1194 invoked by uid 500); 9 Nov 2015 12:10:15 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 1107 invoked by uid 500); 9 Nov 2015 12:10:15 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 1098 invoked by uid 99); 9 Nov 2015 12:10:15 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 12:10:15 +0000 Received: from macbook-pro-2.fritz.box (ip5b40315a.dynamic.kabel-deutschland.de [91.64.49.90]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 094501A022F for ; Mon, 9 Nov 2015 12:10:14 +0000 (UTC) Content-Type: text/plain; charset=us-ascii Mime-Version: 1.0 (Mac OS X Mail 9.1 \(3096.5\)) Subject: Re: How best to deal with wide, structured tuples? From: Aljoscha Krettek In-Reply-To: Date: Mon, 9 Nov 2015 13:10:12 +0100 Content-Transfer-Encoding: quoted-printable Message-Id: <0A37748D-2FEF-48B8-9AEE-4CED127731F1@apache.org> References: <517817F6-8131-41E0-AD21-AE3BE06D5F88@apache.org> To: user@flink.apache.org X-Mailer: Apple Mail (2.3096.5) Hi, yes please, open an Issue for that. I think the method would have to be = added to TableEnvironment. Aljoscha > On 09 Nov 2015, at 12:19, Johann Kovacs wrote: >=20 > Hi, > thanks for having a look at this, Aljoscha. >=20 > Not being able to read a DataSet[Row] from csv is definitively the = most major issue for me right now.=20 > Everything else I could work around with Scala magic. I can create an = issue for this if you'd like. >=20 > Regarding the other points: > 1. Oh absolutely, that's really the most important reason I = implemented those classes in the first place,=20 > although I also liked being able to deserialize the schema from = json/yaml, before converting it to a RowTypeInfo. > 2. I have no strong opinion either way regarding serializability of = TypeInformations. But having the field name <-> index mapping available > in the UDF would be very very nice to have, so that I can access = fields by name instead of by index. > If you decide to make TypeInformation non-serializable, maybe having a = dedicated Schema class for this purpose isn't such a bad idea after all. > 3. Fair enough, I guess I was too excited with Scala implicit magic = when I wrote the prototype :-)=20 > I wouldn't want to include the Schema with each Row either. I guess = non-implicit utility functions for that would work just as well, or the = user can > work around this by themselves if the field name <-> index mapping is = available in the UDF (see point 2) >=20 > Thanks, > Johann >=20 >=20 > On 5 November 2015 at 13:59, Aljoscha Krettek = wrote: > Hi, > these are some interesting Ideas. >=20 > I have some thoughts, though, about the current implementation. > 1. With Schema and Field you are basically re-implementing = RowTypeInfo, so it should not be required. Maybe just an easier way to = create a RowTypeInfo. > 2. Right now, in Flink the TypeInformation is technically not meant = to be Serializable and be shipped to runtime operations. (Although = practically it is in places, there is an ongoing discussion about this.) > 3. Having the Schema as an implicit parameter does not work in the = general case because the compiler does not know which Schema to take if = there are several Schemas around. Maybe the Row would have to be = extended to contain the Schema. But this could have performance = implications. >=20 > We should definitely add support for creating a DataSet[Row] directly = from the a CSV-Input, since otherwise you have to go trough tuples which = does not work > with dynamic schemas and if you have more than a certain amount of = fields. >=20 > Cheers, > Aljoscha > > On 02 Nov 2015, at 17:41, Johann Kovacs wrote: > > > > Hi, > > thanks once again for those pointers. I did a bit of experimenting = the past couple of days and came to the following conclusions: > > 1. Unfortunately, I don't think I can get away with option 1 = (generating POJOs on runtime). At least not without generating lots of = boiler plate code, because I'd like to be able to access fields, also = inside UDFs, by index or name. Also instantiating generated classes in = UDFs seems like a pain. > > 2. Table API looks nice, but doesn't seem to solve the problem of = getting the data into a DataSet in the first place. And I'll likely have = to revert to the DataSet API to do low level operations, such as a = simple map(), as far as I can tell. > > > > However, it seems like the table api already provides exactly = Fabian's option 3 with its Row, RowTypeInfo, and RowSerializer = implementations, if I'm not mistaken, am I? > > I played around a bit, trying to use a DataSet[Row] and it seems to = work great. I had to add a hack to make the ScalaCsvInputFormat read = Rows instead of Tuples, as well as add a few convenience methods to the = Row class and a Schema API to quickly generate RowTypeInfos. I pushed my = experiments here: = https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e535= 5e96df > > A toy example of how I'd like to use the APIs is included in the = TableExperiments.scala file. > > > > Is this a maintainable solution, or is the Row api supposed to be = internal only? If it's supposed to be used outside of the Table api, do = you think we can add the Row and Schema convenience layer like I started = to implement to the core flink-table? Would make it much easier to work = with with the regular DataSet API. > > > > Thanks, > > Johann > > > > On 30 October 2015 at 03:34, Stephan Ewen wrote: > > Hi Johann! > > > > You can try and use the Table API, it has logical tuples that you = program with, rather than tuple classes. > > > > Have a look here: = https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html > > > > Stephan > > > > > > On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske = wrote: > > Hi Johann, > > > > I see three options for your use case. > > > > 1) Generate Pojo code at planning time, i.e., when the program is = composed. This does not work when the program is already running. The = benefit is that you can use key expressions, have typed fields, and type = specific serializers and comparators. > > > > 2) Use Record, an Object[], or List (or some own holder with = a few convenience methods) to store the data untyped and work with = KeySelectors to extract grouping and join keys. The drawbacks of this = approach are generic serializers (Kryo) which won't as efficient as the = native ones. If you know the key types and don't use generic types for = keys, sorting and joining should be still fast. > > > > 3) A hybrid approach of both, which works without code generation. = Use a generic holder, e.g., Object[] for your data records but implement = you own type information, serializers and comparators. After each = operation, you can define the type information of the result using the = returns() method, e.g.; > > myData.map(new MapFunction).returns(myCustomTypeInfo). This approach requires a good = understanding of Flink's type system, but if done correctly, you can = also use expressions or positions to define keys and benefit from = efficient serialization and binary comparisons. However, similar to the = first approach, you need to know the schema of the data in advance = (before the program is executed). > > > > In my opinion the first approach is the better, but as you said it = is more effort to implement and might not work depending on what = information is available at which point in time. > > > > Let me know if you have any questions. > > > > Cheers, Fabian > > > > 2015-10-28 20:01 GMT+01:00 Johann Kovacs : > > Hi all, > > > > I currently find myself evaluating a use case, where I have to deal > > with wide (i.e. about 50-60 columns, definitely more than the 25 > > supported by the Tuple types), structured data from CSV files, with = a > > potentially dynamically (during runtime) generated (or automatically > > inferred from the CSV file) schema. > > SparkSQL works very well for this case, because I can generate or > > infer the schema dynamically at runtime, access fields in UDFs via > > index or name (via the Row API), generate new schemata for UDF = results > > on the fly, and use those schemata to read and write from/to CSV. > > Obviously Spark and SparkSQL have other quirks and I'd like to find = a > > good solution to do this with Flink. > > > > The main limitation seems to be that I can't seem to have DataSets = of > > arbitrary-length, arbitrary-type (i.e. unknown during compile time), > > tuples. The Record API/type looks like it was meant to provide > > something like that but it appears to become deprecated and is not > > well supported by the DataSet APIs (e.g. I can't do a join on = Records > > by field index, nor does the CsvReader API support Records), and it > > has no concept of field names, either. > > > > I though about generating Java classes of my schemata on runtime = (e.g. > > via Javassist), but that seems like a hack, and I'd probably have to > > do this for each intermediate schema as well (e.g. when a map > > operation alters the schema). I haven't tried this avenue yet, so = I'm > > not certain it would actually work, and even less certain that this = is > > a nice and maintainable solution > > > > Can anyone suggest a nice way to deal with this kind of use case? I > > can prepare an example if that would make it more clear. > > > > Thanks, > > Johann > > > > > > >=20