beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugene Kirpichov <kirpic...@google.com>
Subject Re: Handling errors in I/O transformation
Date Mon, 20 Nov 2017 18:19:40 GMT
Note that this works only for streaming inserts, because with failed batch
loads it is not possible to isolate which individual writes failed.

On Mon, Nov 20, 2017 at 10:01 AM Lukasz Cwik <lcwik@google.com> wrote:

> BigQueryIO has been written in such a way to support emitting failed
> records to a "dead letter queue". Not all IO transforms support this but it
> is very useful for the ones that do.
>
> WriteResult writeResult = p.apply(PubsubIO.readMessagesWithAttributes()
>         .fromSubscription(“<some subscription>"))
>         .apply(MapElements.via(new MapMessageToBigQueryRow()))
>         .apply(BigQueryIO.writeTableRows().to(tableReference)
>                 .withSchema(schema)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>
> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
>         );
> PCollection<TableRow> failedWrites = writeResult.getFailedWrites();
>
> On Mon, Nov 20, 2017 at 7:07 AM, Carsten Krebs | GameDuell <
> carsten.krebs@gameduell.de> wrote:
>
>> Hi,
>>
>> I’m new to Apache Beam and currently try to figure out the right way to
>> deal with errors in an I/O transformation. Generally, I would like to
>> filter out tuples which could not have been written for whatever reason and
>> write them to some sort of “dead letter queue”.
>>
>> What is the right way to tag tuples, which have led to an Exception while
>> writing?
>>
>> Currently my very simple pipeline looks like:
>>
>> p.apply(PubsubIO.readMessagesWithAttributes()
>>         .fromSubscription(*“*<some subscription>"))
>>         .apply(MapElements.via(new MapMessageToBigQueryRow()))
>>         .apply(BigQueryIO.writeTableRows().to(tableReference)
>>                 .withSchema(schema)
>>                 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>                 .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
>>         );
>>
>>
>> Do I have to wrap BigQueryIO.Write instance in some kind of custom transformation,
catching Exceptions and tagging the input values somehow? Are there better, more appropriate
ways to do this?
>>
>> Thanks in advance,
>>
>> Carsten
>>
>>
>>
>>
>>
>

Mime
View raw message