beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2257) KafkaIO write without key requires a producer fn
Date Tue, 10 Oct 2017 18:08:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199111#comment-16199111
] 

ASF GitHub Bot commented on BEAM-2257:
--------------------------------------

GitHub user nerdynick opened a pull request:

    https://github.com/apache/beam/pull/3969

    [BEAM-2257] Fixes bug with Kafka Producer needing a key serializer set in the case of
value only PCollection

    …auto add the VoidSerializer for the key.serializer config for kafka producer
    
    Follow this checklist to help us incorporate your contribution quickly and easily:
    
     - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/)
filed for the change (usually before you start working on it).  Trivial changes like typos
do not require a JIRA issue.  Your pull request should address just this issue, without pulling
in other changes.
     - [ ] Each commit in the pull request should have a meaningful subject line and body.
     - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`,
where you replace `BEAM-XXX` with the appropriate JIRA issue.
     - [ ] Write a pull request description that is detailed enough to understand what the
pull request does, how, and why.
     - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will
be performed on your pull request automatically.
     - [ ] If this contribution is large, please file an Apache [Individual Contributor License
Agreement](https://www.apache.org/licenses/icla.pdf).
    
    ---


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nerdynick/beam BEAM-2257

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/3969.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3969
    
----
commit 8410a4065b6e01eb53e11e804f5df2768d874868
Author: nerdynick <nerdynick@gmail.com>
Date:   2017-10-10T18:04:12Z

    Added VoidSerializer for KafkaIO. Modified KafkaIO.Write.values() to auto add the VoidSerializer
for the key.serializer config for kafka producer

----


> KafkaIO write without key requires a producer fn
> ------------------------------------------------
>
>                 Key: BEAM-2257
>                 URL: https://issues.apache.org/jira/browse/BEAM-2257
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} to the topic
without key:
> {code}
>    PCollection<String> strings = ...;
>    strings.apply(KafkaIO.<Void, String>write()
>        .withBootstrapServers("broker_1:9092,broker_2:9092")
>        .withTopic("results")
>        .withValueSerializer(new StringSerializer()) // just need serializer for value
>        .values()
>      );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. So, it
should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, else, the
user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:300)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
> 	at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via {{withKeySerializer()}}
or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message