beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2257) KafkaIO write without key requires a producer fn
Date Tue, 24 Oct 2017 20:08:01 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16217589#comment-16217589
] 

ASF GitHub Bot commented on BEAM-2257:
--------------------------------------

GitHub user rangadi opened a pull request:

    https://github.com/apache/beam/pull/4034

    [BEAM-2257] Ensure Kafka sink serializers are set.

    Ensure that Kafka serializers are set.
    Please merge this after #3969 is merged.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rangadi/beam add_validations

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/4034.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4034
    
----
commit 2a430abf0980d888c4146a864f5112599cfab5b1
Author: Raghu Angadi <rangadi@google.com>
Date:   2017-10-20T22:29:20Z

    Ensure Kafka sink serializers are set.

----


> KafkaIO write without key requires a producer fn
> ------------------------------------------------
>
>                 Key: BEAM-2257
>                 URL: https://issues.apache.org/jira/browse/BEAM-2257
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} to the topic
without key:
> {code}
>    PCollection<String> strings = ...;
>    strings.apply(KafkaIO.<Void, String>write()
>        .withBootstrapServers("broker_1:9092,broker_2:9092")
>        .withTopic("results")
>        .withValueSerializer(new StringSerializer()) // just need serializer for value
>        .values()
>      );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. So, it
should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, else, the
user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:300)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via {{withKeySerializer()}}
or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message