beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Fix PubSubIO write attribute issue
Date Fri, 31 Mar 2017 03:16:11 GMT
Repository: beam
Updated Branches:
  refs/heads/master b1c287bd5 -> 1e2ad65f8


Fix PubSubIO write attribute issue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad9df5b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad9df5b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad9df5b5

Branch: refs/heads/master
Commit: ad9df5b5591ce9d153039ac91e8862af6ea42b45
Parents: b1c287b
Author: Chen Bin <bchen@talend.com>
Authored: Thu Mar 9 11:09:04 2017 +0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Mar 30 20:15:58 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/PubsubJsonClient.java     |  2 +-
 .../org/apache/beam/sdk/util/PubsubJsonClientTest.java | 13 ++++++++++---
 2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ad9df5b5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
index 6bc104f..ef8abfd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
@@ -135,7 +135,7 @@ public class PubsubJsonClient extends PubsubClient {
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
       PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes);
 
-      Map<String, String> attributes = pubsubMessage.getAttributes();
+      Map<String, String> attributes = outgoingMessage.attributes;
       if ((timestampLabel != null || idLabel != null) && attributes == null) {
         attributes = new TreeMap<>();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/ad9df5b5/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
index 17e1870..019190b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
@@ -30,7 +30,10 @@ import com.google.api.services.pubsub.model.ReceivedMessage;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
@@ -114,8 +117,10 @@ public class PubsubJsonClientTest {
     PubsubMessage expectedPubsubMessage = new PubsubMessage()
         .encodeData(DATA.getBytes())
         .setAttributes(
-            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, RECORD_ID));
+            ImmutableMap.<String, String> builder()
+                    .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME))
+                    .put(ID_LABEL, RECORD_ID)
+                    .put("k", "v").build());
     PublishRequest expectedRequest = new PublishRequest()
         .setMessages(ImmutableList.of(expectedPubsubMessage));
     PublishResponse expectedResponse = new PublishResponse()
@@ -125,8 +130,10 @@ public class PubsubJsonClientTest {
                                 .publish(expectedTopic, expectedRequest)
                                 .execute()))
            .thenReturn(expectedResponse);
+    Map<String, String> attrs = new HashMap<>();
+    attrs.put("k", "v");
     OutgoingMessage actualMessage = new OutgoingMessage(
-            DATA.getBytes(), null, MESSAGE_TIME, RECORD_ID);
+            DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }


Mime
View raw message