beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Christopher Reilly (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2116) PubsubJsonClient doesn't write user created attributeMap
Date Fri, 28 Apr 2017 16:04:04 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15989078#comment-15989078
] 

Christopher Reilly commented on BEAM-2116:
------------------------------------------

Ok, thanks! I'm coming from using the Google Dataflow sdk, so I'm having to change things
around to test this out.

> PubsubJsonClient doesn't write user created attributeMap
> --------------------------------------------------------
>
>                 Key: BEAM-2116
>                 URL: https://issues.apache.org/jira/browse/BEAM-2116
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>    Affects Versions: 0.6.0
>         Environment: Java Google Dataflow
>            Reporter: Christopher Reilly
>            Assignee: Daniel Halperin
>            Priority: Minor
>             Fix For: First stable release
>
>
> PubsubJsonClient, which seems to be the hard coded client for PubsubIO.write() doesn't
seem to be respecting the attributes set by the user for the PubsubMessage. 
> In the PubsubJsonClient.publish() method, the passed in OutgoingMessage that contains
the user set attribute map never actually has it's attributes map read. Instead, a new PubsubMessage
is instantiated and the empty attributesMap from that is used. This is fixed in the PubsubGrpcClient,
but that client type is never used by default in any way. 
> public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
>       throws IOException {
>     List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size());
>     for (OutgoingMessage outgoingMessage : outgoingMessages) {
>       PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
>       Map<String, String> attributes = pubsubMessage.getAttributes();
>       if ((timestampLabel != null || idLabel != null) && attributes == null)
{
>         attributes = new TreeMap<>();
>       }
>       if (attributes != null) {
>         pubsubMessage.setAttributes(attributes);
>       }
> Please let me know if I am going down the wrong path here.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message