On Tue, Jun 4, 2019 at 9:20 AM Brian Hulette <bhulette@google.com> wrote:

On Mon, Jun 3, 2019 at 10:04 PM Reuven Lax <relax@google.com> wrote:

On Mon, Jun 3, 2019 at 12:27 PM Brian Hulette <bhulette@google.com> wrote:
> It has to go into the proto somewhere (since that's the only way the SDK can get it), but I'm not sure they should be considered integral parts of the type.
Are you just advocating for an approach where any SDK-specific information is stored outside of the Schema message itself so that Schema really does just represent the type? That seems reasonable to me, and alleviates my concerns about how this applies to columnar encodings a bit as well.

Yes, that's exactly what I'm advocating.

We could lift all of the LogicalTypeConversion messages out of the Schema and the LogicalType like this:

message SchemaCoder {
  Schema schema = 1;
  LogicalTypeConversion root_conversion = 2;
  map<string, LogicalTypeConversion> attribute_conversions = 3; // only necessary for user type aliases, portable logical types by definition have nothing SDK-specific

I'm not sure what the map is for? I think we have status quo wihtout it.

My intention was that the SDK-specific information (to/from functions) for any nested fields that are themselves user type aliases would be stored in this map. That was the motivation for my next question, if we don't allow user types to be nested within other user types we may not need it. 

Oh, is this meant to contain the ids of all the logical types in this schema? If so I don't think SchemaCoder is the right place for this. Any "registry" of logical types should be global to the pipeline, not scoped to a single PCollection IMO.
I may be missing your meaning - but I think we currently only have status quo without this map in the Java SDK because Schema.LogicalType is just an interface that must be implemented. It's appropriate for just portable logical types, not user-type aliases. Note I've adopted Kenn's terminology where portable logical type is a type that can be identified by just a URN and maybe some parameters, while a user type alias needs some SDK specific information, like a class and to/from UDFs.

I think a critical question (that has implications for the above proposal) is how/if the two different concepts Kenn mentioned are allowed to nest. For example, you could argue it's redundant to have a user type alias that has a Row representation with a field that is itself a user type alias, because instead you could just have a single top-level type alias with to/from functions that pack and unpack the entire hierarchy. On the other hand, I think it does make sense for a user type alias or a truly portable logical type to have a field that is itself a truly portable logical type (e.g. a user type alias or portable type with a DateTime).

I've been assuming that user-type aliases could be nested, but should we disallow that? Or should we go the other way and require that logical types define at most one "level"?

No I think it's useful to allow things to be nested (though of course the nesting must terminate).


On Mon, Jun 3, 2019 at 11:08 AM Kenneth Knowles <kenn@apache.org> wrote:

On Mon, Jun 3, 2019 at 10:53 AM Reuven Lax <relax@google.com> wrote:
So I feel a bit leery about making the to/from functions a fundamental part of the portability representation. In my mind, that is very tied to a specific SDK/language. A SDK (say the Java SDK) wants to allow users to use a wide variety of native types with schemas, and under the covers uses the to/from functions to implement that. However from the portable Beam perspective, the schema itself should be the real "type" of the PCollection; the to/from methods are simply a way that a particular SDK makes schemas easier to use. It has to go into the proto somewhere (since that's the only way the SDK can get it), but I'm not sure they should be considered integral parts of the type.

On the doc in a couple places this distinction was made:

* For truly portable logical types, no instructions for the SDK are needed. Instead, they require:
   - URN: a standardized identifier any SDK can recognize
   - A spec: what is the universe of values in this type?
   - A representation: how is it represented in built-in types? This is how SDKs who do not know/care about the URN will process it
   - (optional): SDKs choose preferred SDK-specific types to embed the values in. SDKs have to know about the URN and choose for themselves.

*For user-level type aliases, written as convenience by the user in their pipeline, what Java schemas have today:
   - to/from UDFs: the code is SDK-specific
   - some representation of the intended type (like java class): also SDK specific
   - a representation
   - any "id" is just like other ids in the pipeline, just avoiding duplicating the proto
   - Luke points out that nesting these can give multiple SDKs a hint

In my mind the remaining complexity is whether or not we need to be able to move between the two. Composite PTransforms, for example, do have fluidity between being strictly user-defined versus portable URN+payload. But it requires lots of engineering, namely the current work on expansion service.

On Mon, Jun 3, 2019 at 10:23 AM Brian Hulette <bhulette@google.com> wrote:
Ah I see, I didn't realize that. Then I suppose we'll need to/from functions somewhere in the logical type conversion to preserve the current behavior.

I'm still a little hesitant to make these functions an explicit part of LogicalTypeConversion for another reason. Down the road, schemas could give us an avenue to use a batched columnar format (presumably arrow, but of course others are possible). By making to/from an explicit part of logical types we add some element-wise logic to a schema representation that's otherwise ambivalent to element-wise vs. batched encodings.

I suppose you could make an argument that to/from are only for custom types. There will also be some set of well-known types identified only by URN and some parameters, which could easily be translated to a columnar format. We could just not support custom types fully if we add a columnar encoding, or maybe add optional toBatch/fromBatch functions when/if we get there.

What about something like this that makes the two different types of logical types explicit?

// Describes a logical type and how to convert between it and its representation (e.g. Row).
message LogicalTypeConversion {
  oneof conversion {
    message Standard standard = 1;
    message Custom custom = 2;

  message Standard {
    String urn = 1;
    repeated string args = 2; // could also be a map

  message Custom {
    FunctionSpec(?) toRepresentation = 1; 
    FunctionSpec(?) fromRepresentation = 2;
    bytes type = 3; // e.g. serialized class for Java

And LogicalType and Schema become:

message LogicalType {
  FieldType representation = 1;
  LogicalTypeConversion conversion = 2;

message Schema {
  repeated Field fields = 1;
  LogicalTypeConversion conversion = 2; // implied that representation is Row


On Sat, Jun 1, 2019 at 10:44 AM Reuven Lax <relax@google.com> wrote:
Keep in mind that right now the SchemaRegistry is only assumed to exist at graph-construction time, not at execution time; all information in the schema registry is embedded in the SchemaCoder, which is the only thing we keep around when the pipeline is actually running. We could look into changing this, but it would potentially be a very big change, and I do think we should start getting users actively using schemas soon.

On Fri, May 31, 2019 at 3:40 PM Brian Hulette <bhulette@google.com> wrote:
> Can you propose what the protos would look like in this case? Right now LogicalType does not contain the to/from conversion functions in the proto. Do you think we'll need to add these in?

Maybe. Right now the proposed LogicalType message is pretty simple/generic:
message LogicalType {
  FieldType representation = 1;
  string logical_urn = 2;
  bytes logical_payload = 3;

If we keep just logical_urn and logical_payload, the logical_payload could itself be a protobuf with attributes of 1) a serialized class and 2/3) to/from functions. Or, alternatively, we could have a generalization of the SchemaRegistry for logical types. Implementations for standard types and user-defined types would be registered by URN, and the SDK could look them up given just a URN. I put a brief section about this alternative in the doc last week [1]. What I suggested there included removing the logical_payload field, which is probably overkill. The critical piece is just relying on a registry in the SDK to look up types and to/from functions rather than storing them in the portable schema itself.

I kind of like keeping the LogicalType message generic for now, since it gives us a way to try out these various approaches, but maybe that's just a cop out.  

On Fri, May 31, 2019 at 12:36 PM Reuven Lax <relax@google.com> wrote:

On Tue, May 28, 2019 at 10:11 AM Brian Hulette <bhulette@google.com> wrote:

On Sun, May 26, 2019 at 1:25 PM Reuven Lax <relax@google.com> wrote:

On Fri, May 24, 2019 at 11:42 AM Brian Hulette <bhulette@google.com> wrote:
tl;dr: SchemaCoder represents a logical type with a base type of Row and we should think about that.

I'm a little concerned that the current proposals for a portable representation don't actually fully represent Schemas. It seems to me that the current java-only Schemas are made up three concepts that are intertwined:
(a) The Java SDK specific code for schema inference, type coercion, and "schema-aware" transforms.
(b) A RowCoder[1] that encodes Rows[2] which have a particular Schema[3].
(c) A SchemaCoder[4] that has a RowCoder for a particular schema, and functions for converting Rows with that schema to/from a Java type T. Those functions and the RowCoder are then composed to provider a Coder for the type T.

RowCoder is currently just an internal implementation detail, it can be eliminated. SchemaCoder is the only thing that determines a schema today.
Why not keep it around? I think it would make sense to have a RowCoder implementation in every SDK, as well as something like SchemaCoder that defines a conversion from that SDK's "Row" to the language type.

The point is that from a programmer's perspective, there is nothing much special about Row. Any type can have a schema, and the only special thing about Row is that it's always guaranteed to exist. From that standpoint, Row is nearly an implementation detail. Today RowCoder is never set on _any_ PCollection, it's literally just used as a helper library, so there's no real need for it to exist as a "Coder." 

We're not concerned with (a) at this time since that's specific to the SDK, not the interface between them. My understanding is we just want to define a portable representation for (b) and/or (c).

What has been discussed so far is really just a portable representation for (b), the RowCoder, since the discussion is only around how to represent the schema itself and not the to/from functions.

Correct. The to/from functions are actually related to a). One of the big goals of schemas was that users should not be forced to operate on rows to get schemas. A user can create PCollection<MyRandomType> and as long as the SDK can infer a schema from MyRandomType, the user never needs to even see a Row object. The to/fromRow functions are what make this work today.

One of the points I'd like to make is that this type coercion is a useful concept on it's own, separate from schemas. It's especially useful for a type that has a schema and is encoded by RowCoder since that can represent many more types, but the type coercion doesn't have to be tied to just schemas and RowCoder. We could also do type coercion for types that are effectively wrappers around an integer or a string. It could just be a general way to map language types to base types (i.e. types that we have a coder for). Then it just becomes a general framework for extending coders to represent more language types.

Let's not tie those conversations. Maybe a similar concept will hold true for general coders (or we might decide to get rid of coders in favor of schemas, in which case that becomes moot), but I don't think we should prematurely generalize.

One of the outstanding questions for that schema representation is how to represent logical types, which may or may not have some language type in each SDK (the canonical example being a timsetamp type with seconds and nanos and java.time.Instant). I think this question is critically important, because (c), the SchemaCoder, is actually *defining a logical type* with a language type T in the Java SDK. This becomes clear when you compare SchemaCoder[4] to the Schema.LogicalType interface[5] - both essentially have three attributes: a base type, and two functions for converting to/from that base type. The only difference is for SchemaCoder that base type must be a Row so it can be represented by a Schema alone, while LogicalType can have any base type that can be represented by FieldType, including a Row.

This is not true actually. SchemaCoder can have any base type, that's why (in Java) it's SchemaCoder<T>. This is why PCollection<T> can have a schema, even if T is not Row.

I'm not sure I effectively communicated what I meant - When I said SchemaCoder's "base type" I wasn't referring to T, I was referring to the base FieldType, whose coder we use for this type. I meant "base type" to be analogous to LogicalType's `getBaseType`, or what Kenn is suggesting we call "representation" in the portable beam schemas doc. To define some terms from my original message:
base type = an instance of FieldType, crucially this is something that we have a coder for (be it VarIntCoder, Utf8Coder, RowCoder, ...)
language type (or "T", "type T", "logical type") = Some Java class (or something analogous in the other SDKs) that we may or may not have a coder for. It's possible to define functions for converting instances of the language type to/from the base type.

I was just trying to make the case that SchemaCoder is really a special case of LogicalType, where `getBaseType` always returns a Row with the stored Schema.

Yeah, I think  I got that point. 

Can you propose what the protos would look like in this case? Right now LogicalType does not contain the to/from conversion functions in the proto. Do you think we'll need to add these in?

To make the point with code: SchemaCoder<T> can be made to implement Schema.LogicalType<T,Row> with trivial implementations of getBaseType, toBaseType, and toInputType (I'm not trying to say we should or shouldn't do this, just using it illustrate my point):

class SchemaCoder extends CustomCoder<T> implements Schema.LogicalType<T, Row> {

  FieldType getBaseType() {
    return FieldType.row(getSchema());

  public Row toBaseType() {
    return this.toRowFunction.apply(input);

  public T toInputType(Row base) {
    return this.fromRowFunction.apply(base);

I think it may make sense to fully embrace this duality, by letting SchemaCoder have a baseType other than just Row and renaming it to LogicalTypeCoder/LanguageTypeCoder. The current Java SDK schema-aware transforms (a) would operate only on LogicalTypeCoders with a Row base type. Perhaps some of the current schema logic could  alsobe applied more generally to any logical type  - for example, to provide type coercion for logical types with a base type other than Row, like int64 and a timestamp class backed by millis, or fixed size bytes and a UUID class. And having a portable representation that represents those (non Row backed) logical types with some URN would also allow us to pass them to other languages without unnecessarily wrapping them in a Row in order to use SchemaCoder.

I think the actual overlap here is between the to/from functions in SchemaCoder (which is what allows SchemaCoder<T> where T != Row) and the equivalent functionality in LogicalType. However making all of schemas simply just a logical type feels a bit awkward and circular to me. Maybe we should refactor that part out into a LogicalTypeConversion proto, and reference that from both LogicalType and from SchemaCoder?

LogicalType is already potentially circular though. A schema can have a field with a logical type, and that logical type can have a base type of Row with a field with a logical type (and on and on...). To me it seems elegant, not awkward, to recognize that SchemaCoder is just a special case of this concept.

Something like the LogicalTypeConversion proto would definitely be an improvement, but I would still prefer just using a top-level logical type :)

I've added a section to the doc [6] to propose this alternative in the context of the portable representation but I wanted to bring it up here as well to solicit feedback.

On Fri, May 10, 2019 at 9:16 AM Brian Hulette <bhulette@google.com> wrote:
Ah thanks! I added some language there.

From: Kenneth Knowles <kenn@apache.org>
Date: Thu, May 9, 2019 at 5:31 PM
To: dev

From: Brian Hulette <bhulette@google.com>
Date: Thu, May 9, 2019 at 2:02 PM
To: <dev@beam.apache.org>

We briefly discussed using arrow schemas in place of beam schemas entirely in an arrow thread [1]. The biggest reason not to this was that we wanted to have a type for large iterables in beam schemas. But given that large iterables aren't currently implemented, beam schemas look very similar to arrow schemas.
I think it makes sense to take inspiration from arrow schemas where possible, and maybe even copy them outright. Arrow already has a portable (flatbuffers) schema representation [2], and implementations for it in many languages that we may be able to re-use as we bring schemas to more SDKs (the project has Python and Go implementations). There are a couple of concepts in Arrow schemas that are specific for the format and wouldn't make sense for us, (fields can indicate whether or not they are dictionary encoded, and the schema has an endianness field), but if you drop those concepts the arrow spec looks pretty similar to the beam proto spec.

FWIW I left a blank section in the doc for filling out what the differences are and why, and conversely what the interop opportunities may be. Such sections are some of my favorite sections of design docs.


From: Robert Bradshaw <robertwb@google.com>
Date: Thu, May 9, 2019 at 1:38 PM
To: dev

From: Reuven Lax <relax@google.com>
Date: Thu, May 9, 2019 at 7:29 PM
To: dev

> Also in the future we might be able to do optimizations at the runner level if at the portability layer we understood schemes instead of just raw coders. This could be things like only parsing a subset of a row (if we know only a few fields are accessed) or using a columnar data structure like Arrow to encode batches of rows across portability. This doesn't affect data semantics of course, but having a richer, more-expressive type system opens up other opportunities.

But we could do all of that with a RowCoder we understood to designate
the type(s), right?

> On Thu, May 9, 2019 at 10:16 AM Robert Bradshaw <robertwb@google.com> wrote:
>> On the flip side, Schemas are equivalent to the space of Coders with
>> the addition of a RowCoder and the ability to materialize to something
>> other than bytes, right? (Perhaps I'm missing something big here...)
>> This may make a backwards-compatible transition easier. (SDK-side, the
>> ability to reason about and operate on such types is of course much
>> richer than anything Coders offer right now.)
>> From: Reuven Lax <relax@google.com>
>> Date: Thu, May 9, 2019 at 4:52 PM
>> To: dev
>> > FYI I can imagine a world in which we have no coders. We could define the entire model on top of schemas. Today's "Coder" is completely equivalent to a single-field schema with a logical-type field (actually the latter is slightly more expressive as you aren't forced to serialize into bytes).
>> >
>> > Due to compatibility constraints and the effort that would be  involved in such a change, I think the practical decision should be for schemas and coders to coexist for the time being. However when we start planning Beam 3.0, deprecating coders is something I would like to suggest.
>> >
>> > On Thu, May 9, 2019 at 7:48 AM Robert Bradshaw <robertwb@google.com> wrote:
>> >>
>> >> From: Kenneth Knowles <kenn@apache.org>
>> >> Date: Thu, May 9, 2019 at 10:05 AM
>> >> To: dev
>> >>
>> >> > This is a huge development. Top posting because I can be more compact.
>> >> >
>> >> > I really think after the initial idea converges this needs a design doc with goals and alternatives. It is an extraordinarily consequential model change. So in the spirit of doing the work / bias towards action, I created a quick draft at https://s.apache.org/beam-schemas and added everyone on this thread as editors. I am still in the process of writing this to match the thread.
>> >>
>> >> Thanks! Added some comments there.
>> >>
>> >> > *Multiple timestamp resolutions*: you can use logcial types to represent nanos the same way Java and proto do.
>> >>
>> >> As per the other discussion, I'm unsure the value in supporting
>> >> multiple timestamp resolutions is high enough to outweigh the cost.
>> >>
>> >> > *Why multiple int types?* The domain of values for these types are different. For a language with one "int" or "number" type, that's another domain of values.
>> >>
>> >> What is the value in having different domains? If your data has a
>> >> natural domain, chances are it doesn't line up exactly with one of
>> >> these. I guess it's for languages whose types have specific domains?
>> >> (There's also compactness in representation, encoded and in-memory,
>> >> though I'm not sure that's high.)
>> >>
>> >> > *Columnar/Arrow*: making sure we unlock the ability to take this path is Paramount. So tying it directly to a row-oriented coder seems counterproductive.
>> >>
>> >> I don't think Coders are necessarily row-oriented. They are, however,
>> >> bytes-oriented. (Perhaps they need not be.) There seems to be a lot of
>> >> overlap between what Coders express in terms of element typing
>> >> information and what Schemas express, and I'd rather have one concept
>> >> if possible. Or have a clear division of responsibilities.
>> >>
>> >> > *Multimap*: what does it add over an array-valued map or large-iterable-valued map? (honest question, not rhetorical)
>> >>
>> >> Multimap has a different notion of what it means to contain a value,
>> >> can handle (unordered) unions of non-disjoint keys, etc. Maybe this
>> >> isn't worth a new primitive type.
>> >>
>> >> > *URN/enum for type names*: I see the case for both. The core types are fundamental enough they should never really change - after all, proto, thrift, avro, arrow, have addressed this (not to mention most programming languages). Maybe additions once every few years. I prefer the smallest intersection of these schema languages. A oneof is more clear, while URN emphasizes the similarity of built-in and logical types.
>> >>
>> >> Hmm... Do we have any examples of the multi-level primitive/logical
>> >> type in any of these other systems? I have a bias towards all types
>> >> being on the same footing unless there is compelling reason to divide
>> >> things into primitive/use-defined ones.
>> >>
>> >> Here it seems like the most essential value of the primitive type set
>> >> is to describe the underlying representation, for encoding elements in
>> >> a variety of ways (notably columnar, but also interfacing with other
>> >> external systems like IOs). Perhaps, rather than the previous
>> >> suggestion of making everything a logical of bytes, this could be made
>> >> clear by still making everything a logical type, but renaming
>> >> "TypeName" to Representation. There would be URNs (typically with
>> >> empty payloads) for the various primitive types (whose mapping to
>> >> their representations would be the identity).
>> >>
>> >> - Robert