beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carlos Alonso <car...@mrcalonso.com>
Subject Table with field based partitioning must have a schema
Date Fri, 23 Mar 2018 22:01:24 GMT
Hi everyone!!

When trying to insert into BigQuery using dynamic destinations I get this
error: "Tabie with field based partitioning must have a schema" that
suggests that I'm not providing such a schema and I don't understand why as
I think I am. Here: https://pastebin.com/Q1jF024B you can find the full
stack trace and below you can see the code of the DynamicDestinations
implementation. Basically I'm dumping a stream of PubSub into BQ being that
stream of heterogeneous Json documents and routing each type to its
corresponding table.

The tuples contain the Json document and the schema itself for the
corresponding table (the tuple is composed in a previous transform before
from a side input as the schema is read from BQ using BigQueryClient
class). and the Destination KV[String, String] is supposed to hold the
table name as key and the schema as value.

The logs show many entries for "Returning destination for...", a few of
"Returning table..." ones and no "Returning schema for..." at all which may
indicate why BQ complains that no schema is provided, the question would
then be... Why is that method never invoked?

class JsonRouter(dataset: String)
  extends DynamicDestinations[(Json, String), KV[String, String]] {

  import JsonRouter._

  override def getDestination(element: ValueInSingleWindow[(Json,
String)]): KV[String, String] = {
    log.debug(s"Returning destination for ${element.getValue}")
    KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
  }

  override def getSchema(element: KV[String, String]): TableSchema = {
    log.debug(s"Returning schema for ${element.getKey}")
    BigQueryUtil.parseSchema(element.getValue)
  }

  override def getTable(element: KV[String, String]): TableDestination = {
    log.debug(s"Returning table for ${element.getKey}")
    new TableDestination(s"$dataset.${element.getKey}", s"Table to
store ${element.getKey}",
      BQTypesRouter.TimePartitioning)
  }

  override def getDestinationCoder: Coder[KV[String, String]] =
    KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
}


Thanks!!

Mime
View raw message