Beam Java's support for schemas is just about done: we infer schemas from a variety of types, we have a variety of utility transforms (join, aggregate, etc.) for schemas, and schemas are integrated with the ParDo machinery. The big remaining task I'm working on is writing documentation and examples for all of this so that users are aware. If you're interested, these slides
from the London Beam meetup show a bit more how schemas can be used and how they simplify the API.
I want to start integrating schemas into portability so that they can be used from other languages such as Python (in particular this will also allow BeamSQL to be invoked from other languages). In order to do this, the Beam portability protos must have a way of representing schemas. Since this has not been discussed before, I'm starting this discussion now on the list.
As a reminder: a schema represents the type of a PCollection as a collection of fields. Each field has a name, an id (position), and a field type. A field type can be either a primitive type (int, long, string, byte array, etc.), a nested row (itself with a schema), an array, or a map.
We also support logical types. A logical type is a way for the user to embed their own types in schema fields. A logical type is always backed by a schema type, and contains a function for mapping the user's logical type to the field type. You can think of this as a generalization of a coder: while a coder always maps the user type to a byte array, a logical type can map to an int, or a string, or any other schema field type (in fact any coder can always be used as a logical type for mapping to byte-array field types). Logical types are used extensively by Beam SQL to represent SQL types that have no correspondence in Beam's field types (e.g. SQL has 4 different date/time types). Logical types for Beam schemas have a lot of similarities to AVRO logical types.
An initial proto representation for schemas is here
. Before we go further with this, I would like community consensus on what this representation should be. I can start by suggesting a few possible changes to this representation (and hopefully others will suggest others):
- Kenn Knowles has suggested removing DATETIME as a primitive type, and instead making it a logical type backed by INT64 as this keeps our primitive types closer to "classical" PL primitive types. This also allows us to create multiple versions of this type - e.g. TIMESTAMP(millis), TIMESTAMP(micros), TIMESTAMP(nanos).
- If we do the above, we can also consider removing DECIMAL and making that a logical type as well.
- The id field is currently used for some performance optimizations only. If we formalized the idea of schema types having ids, then we might be able to use this to allow self-recursive schemas (self-recursive types are not currently allowed).
- Beam Schemas currently have an ARRAY type. However Beam supports "large iterables" (iterables that don't fit in memory that the runner can page in), and this doesn't match well to arrays. I think we need to add an ITERABLE type as well to support things like GroupByKey results.
It would also be interesting to explore allowing well-known metadata tags on fields that Beam interprets. e.g. key and value, to allow Beam to interpret any two-field schema as a KV, or window and timestamp to allow automatically filling those out. However this would be an extension to the current schema concept and deserves a separate discussion thread IMO.
I ask that we please limit this discussion to the proto representation of schemas. If people want to discuss (or rediscuss) other things around Beam schemas, I'll be happy to create separate threads for those discussions.