I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from GenericRecord, and tries to cast objects based on their types (and cast(object) to long for "timestamp-millis"). see [1].

So in order to use `AvroUtils.toBeamRowStrict`, the generated GenericRecord should have long for "timestamp-millis". 
The schema you pasted looks right. Not sure why generated class is Joda time (is it controlled by some flags?). But at least you could write a small function to do schema conversion for your need. 

[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672


On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvishwas@gmail.com> wrote:
Hi Rui,

I agree that by converting it to long, there will be no error.
But the KafkaIO is giving a GenericRecord with attribute of type JodaTime. Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict again converts it to JodaTime.

I used the avro tools 1.8.2 jar, for the below schema and I see that the generated class has a JodaTime attribute.

            "name": "timeOfRelease",
                    "type": "long",
                    "logicalType": "timestamp-millis",
                    "connect.version": 1,
                    "connect.name": "org.apache.kafka.connect.data.Timestamp"

Attribute type in generated class:
private org.joda.time.DateTime timeOfRelease;

So not sure why this type casting is required.

Thanks & Regards,

On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruwang@google.com> wrote:
Read from the code and seems like as the logical type "timestamp-millis" means, it's expecting millis in Long as values under this logical type.

So if you can convert joda-time to millis before calling "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception will gone.


On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lcwik@google.com> wrote:

On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvishwas@gmail.com> wrote:

Below is my pipeline:

KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql ---------------> KafkaSink(KafkaIO.write)

The avro schema of the topic has a field of logical type timestamp-millis.  KafkaIO.read transform is creating a KafkaRecord<String,GenericRecord>, where this field is being converted to joda-time. 

In my Pardo transform, I am trying to use the AvroUtils class methods to convert the generic record to Beam Row and getting below class cast exception for the joda-time attribute.

             AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)

Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.Long
    at org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
    at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)

Thanks & Regards,