beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Anton Kedin (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-5335) [SQL] Output schema is set incorrectly
Date Thu, 06 Sep 2018 17:47:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-5335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16606168#comment-16606168
] 

Anton Kedin commented on BEAM-5335:
-----------------------------------

And we need a test to chain multiple queries

> [SQL] Output schema is set incorrectly
> --------------------------------------
>
>                 Key: BEAM-5335
>                 URL: https://issues.apache.org/jira/browse/BEAM-5335
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Priority: Major
>
> *From: https://stackoverflow.com/questions/52181795/how-do-i-get-an-output-schema-for-an-apache-beam-sql-query
:*
> I've been playing with the Beam SQL DSL and I'm unable to use the output from a query
without providing a code that's aware of the output schema manually. Can I infer the output
schema rather than hardcoding it?
> Neither the walkthrough or the examples actually use the output from a query. I'm using
Scio rather than the plain Java API to keep the code relatively readable and concise, I don't
think that makes a difference for this question.
> Here's an example of what I mean.
> Given an input schema inSchema and some data source that is mapped onto a Row as follows:
(in this example, Avro-based, but again, I don't think that matters):
> {code}
> sc.avroFile[Foo](args("input"))
>    .map(fooToRow)
>    .setCoder(inSchema.getRowCoder)
>    .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
>    .saveAsTextFile(args("output"))
> {code}
> Running this pipeline results in a KryoException as follows:
> {code}
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> fieldIndices (org.apache.beam.sdk.schemas.Schema)
> schema (org.apache.beam.sdk.values.RowWithStorage)
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> {code}
> However, inserting a RowCoder matching the SQL output, in this case a single count int
column:
> {code}
>    ...snip...
>    .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
>    .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
>    .saveAsTextFile(args("output"))
> {code}
> Now the pipeline runs just fine.
> Having to manually tell the pipeline how to encode the SQL output seems unnecessary,
given that we specify the input schema/coder(s) and a query. It seems to me that we should
be able to infer the output schema from that - but I can't see how, other than maybe using
Calcite directly?
> Before raising a ticket on the Beam Jira, I thought I'd check I wasn't missing something
obvious!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message