beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/2] incubator-beam git commit: Integrate DisplayData into DataflowPipelineRunner
Date Wed, 23 Mar 2016 21:26:10 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master db160faba -> 619db54db


Integrate DisplayData into DataflowPipelineRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46946ea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46946ea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46946ea9

Branch: refs/heads/master
Commit: 46946ea98a4d28f983e04cf0c38f740e421e22d8
Parents: db160fa
Author: Scott Wegner <swegner@google.com>
Authored: Tue Mar 22 10:43:56 2016 -0700
Committer: Scott Wegner <swegner@google.com>
Committed: Wed Mar 23 13:32:52 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/DataflowPipelineTranslator.java | 12 +++
 .../sdk/transforms/display/DisplayData.java     | 15 ++-
 .../cloud/dataflow/sdk/util/PropertyNames.java  |  1 +
 .../runners/DataflowPipelineTranslatorTest.java | 98 +++++++++++++++++++-
 .../sdk/transforms/display/DisplayDataTest.java |  2 +-
 5 files changed, 122 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46946ea9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index 0feae95..155c454 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -59,6 +59,7 @@ import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
 import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
@@ -79,6 +80,7 @@ import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.cloud.dataflow.sdk.values.TypedPValue;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -548,6 +550,7 @@ public class DataflowPipelineTranslator {
       currentStep.setKind(type);
       steps.add(currentStep);
       addInput(PropertyNames.USER_NAME, getFullName(transform));
+      addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
     }
 
     @Override
@@ -725,6 +728,15 @@ public class DataflowPipelineTranslator {
       outputInfoList.add(outputInfo);
     }
 
+    private void addDisplayData(String name, DisplayData displayData) {
+      List<Map<String, Object>> serializedItems = Lists.newArrayList();
+      for (DisplayData.Item item : displayData.items()) {
+        serializedItems.add(MAPPER.convertValue(item, Map.class));
+      }
+
+      addList(getProperties(), name, serializedItems);
+    }
+
     @Override
     public OutputReference asOutputReference(PValue value) {
       AppliedPTransform<?, ?, ?> transform =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46946ea9/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
index 05fa7c7..dadc730 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
@@ -26,6 +26,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonInclude;
+
 import org.apache.avro.reflect.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -214,10 +217,12 @@ public class DisplayData {
       this.label = label;
     }
 
+    @JsonGetter("namespace")
     public String getNamespace() {
       return ns;
     }
 
+    @JsonGetter("key")
     public String getKey() {
       return key;
     }
@@ -226,6 +231,7 @@ public class DisplayData {
      * Retrieve the {@link DisplayData.Type} of display metadata. All metadata conforms to
a
      * predefined set of allowed types.
      */
+    @JsonGetter("type")
     public Type getType() {
       return type;
     }
@@ -233,6 +239,7 @@ public class DisplayData {
     /**
      * Retrieve the value of the metadata item.
      */
+    @JsonGetter("value")
     public String getValue() {
       return value;
     }
@@ -244,6 +251,8 @@ public class DisplayData {
      * <p>Some display data types will not provide a short value, in which case the
return value
      * will be null.
      */
+    @JsonGetter("shortValue")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     @Nullable
     public String getShortValue() {
       return shortValue;
@@ -255,6 +264,8 @@ public class DisplayData {
      *
      * <p>If no label was specified, this will return {@code null}.
      */
+    @JsonGetter("label")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     @Nullable
     public String getLabel() {
       return label;
@@ -266,8 +277,10 @@ public class DisplayData {
      *
      * <p>If no URL was specified, this will return {@code null}.
      */
+    @JsonGetter("linkUrl")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     @Nullable
-    public String getUrl() {
+    public String getLinkUrl() {
       return url;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46946ea9/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
index ec65189..81572ea 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
@@ -103,4 +103,5 @@ public class PropertyNames {
   public static final String VALIDATE_SINK = "validate_sink";
   public static final String VALIDATE_SOURCE = "validate_source";
   public static final String VALUE = "value";
+  public static final String DISPLAY_DATA = "display_data";
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46946ea9/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
index 497552f..65f8dde 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -20,9 +20,11 @@ import static com.google.cloud.dataflow.sdk.util.Structs.addObject;
 import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary;
 import static com.google.cloud.dataflow.sdk.util.Structs.getString;
 import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -53,6 +55,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.Sum;
 import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
 import com.google.cloud.dataflow.sdk.util.GcsUtil;
 import com.google.cloud.dataflow.sdk.util.OutputReference;
 import com.google.cloud.dataflow.sdk.util.PropertyNames;
@@ -81,6 +84,8 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -91,9 +96,9 @@ import java.util.Map;
  * Tests for DataflowPipelineTranslator.
  */
 @RunWith(JUnit4.class)
-public class DataflowPipelineTranslatorTest {
+public class DataflowPipelineTranslatorTest implements Serializable {
 
-  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   // A Custom Mockito matcher for an initial Job that checks that all
   // expected fields are set.
@@ -496,7 +501,7 @@ public class DataflowPipelineTranslatorTest {
     return step;
   }
 
-  private static class NoOpFn extends DoFn<String, String>{
+  private static class NoOpFn extends DoFn<String, String> {
     @Override public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
@@ -796,4 +801,89 @@ public class DataflowPipelineTranslatorTest {
     Step collectionToSingletonStep = steps.get(2);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
+
+  @Test
+  public void testStepDisplayData() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    DataflowPipeline pipeline = DataflowPipeline.create(options);
+
+    DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(c.element());
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder
+                .add("foo", "bar")
+                .add("foo2", DataflowPipelineTranslatorTest.class)
+                .withLabel("Test Class")
+                .withLinkUrl("http://www.google.com");
+      }
+    };
+
+    DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(c.element());
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add("foo3", "barge");
+      }
+    };
+
+    pipeline
+      .apply(Create.of(1, 2, 3))
+      .apply(ParDo.of(fn1))
+      .apply(ParDo.of(fn2));
+
+    Job job = translator.translate(
+        pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(3, steps.size());
+
+    Map<String, Object> parDo1Properties = steps.get(1).getProperties();
+    Map<String, Object> parDo2Properties = steps.get(2).getProperties();
+    assertThat(parDo1Properties, hasKey("display_data"));
+
+    Collection<Map<String, String>> fn1displayData =
+            (Collection<Map<String, String>>) parDo1Properties.get("display_data");
+    Collection<Map<String, String>> fn2displayData =
+            (Collection<Map<String, String>>) parDo2Properties.get("display_data");
+
+    ImmutableList expectedFn1DisplayData = ImmutableList.of(
+            ImmutableMap.<String, String>builder()
+              .put("namespace", fn1.getClass().getName())
+              .put("key", "foo")
+              .put("type", "STRING")
+              .put("value", "bar")
+              .build(),
+            ImmutableMap.<String, String>builder()
+              .put("namespace", fn1.getClass().getName())
+              .put("key", "foo2")
+              .put("type", "JAVA_CLASS")
+              .put("value", DataflowPipelineTranslatorTest.class.getName())
+              .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
+              .put("label", "Test Class")
+              .put("linkUrl", "http://www.google.com")
+              .build()
+    );
+
+    ImmutableList expectedFn2DisplayData = ImmutableList.of(
+            ImmutableMap.<String, String>builder()
+                    .put("namespace", fn2.getClass().getName())
+                    .put("key", "foo3")
+                    .put("type", "STRING")
+                    .put("value", "barge")
+                    .build()
+    );
+
+    assertEquals(expectedFn1DisplayData, fn1displayData);
+    assertEquals(expectedFn2DisplayData, fn2displayData);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46946ea9/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
index 13dd618..dfc8c38 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
@@ -606,7 +606,7 @@ public class DisplayDataTest {
         urlMatcher, "display item with url", "URL") {
       @Override
       protected String featureValueOf(DisplayData.Item actual) {
-        return actual.getUrl();
+        return actual.getLinkUrl();
       }
     };
   }


Mime
View raw message