beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/2] incubator-beam git commit: Add display data to ParDo transforms
Date Fri, 15 Apr 2016 20:43:13 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 9ff426964 -> 0bb4f9c1e


Add display data to ParDo transforms


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77a77c9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77a77c9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77a77c9d

Branch: refs/heads/master
Commit: 77a77c9d159d7f9b1e6f645e54f0a4de86180bfe
Parents: 9ff4269
Author: Scott Wegner <swegner@google.com>
Authored: Tue Apr 12 10:08:37 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Fri Apr 15 12:12:44 2016 -0700

----------------------------------------------------------------------
 .../runners/DataflowPipelineTranslatorTest.java | 94 +++++++++++---------
 .../runners/inprocess/ForwardingPTransform.java |  6 ++
 .../beam/sdk/transforms/DoFnReflector.java      |  6 ++
 .../beam/sdk/transforms/DoFnWithContext.java    | 14 ++-
 .../org/apache/beam/sdk/transforms/Filter.java  | 27 ++++++
 .../apache/beam/sdk/transforms/GroupByKey.java  |  9 ++
 .../transforms/IntraBundleParallelization.java  |  9 ++
 .../org/apache/beam/sdk/transforms/ParDo.java   | 62 ++++++++++---
 .../apache/beam/sdk/transforms/Partition.java   | 13 +++
 .../inprocess/ForwardingPTransformTest.java     | 10 +++
 .../sdk/transforms/DoFnWithContextTest.java     | 11 +++
 .../apache/beam/sdk/transforms/FilterTest.java  | 20 +++++
 .../beam/sdk/transforms/GroupByKeyTest.java     | 15 ++++
 .../IntraBundleParallelizationTest.java         | 26 ++++++
 .../apache/beam/sdk/transforms/ParDoTest.java   | 74 ++++++++++++---
 .../beam/sdk/transforms/PartitionTest.java      | 13 +++
 16 files changed, 341 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
index 7a3caa6..0d58601 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -72,9 +72,9 @@ import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
-import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -890,10 +890,12 @@ public class DataflowPipelineTranslatorTest implements Serializable
{
       }
     };
 
+    ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1);
+    ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2);
     pipeline
       .apply(Create.of(1, 2, 3))
-      .apply(ParDo.of(fn1))
-      .apply(ParDo.of(fn2));
+      .apply(parDo1)
+      .apply(parDo2);
 
     Job job =
         translator
@@ -910,43 +912,53 @@ public class DataflowPipelineTranslatorTest implements Serializable
{
     Map<String, Object> parDo2Properties = steps.get(2).getProperties();
     assertThat(parDo1Properties, hasKey("display_data"));
 
-
-    @SuppressWarnings("unchecked")
-    Collection<Map<String, Object>> fn1displayData =
-            (Collection<Map<String, Object>>) parDo1Properties.get("display_data");
-    @SuppressWarnings("unchecked")
-    Collection<Map<String, Object>> fn2displayData =
-            (Collection<Map<String, Object>>) parDo2Properties.get("display_data");
-
-    @SuppressWarnings("unchecked")
-    Matcher<Iterable<? extends Map<String, Object>>> fn1expectedData =
-        Matchers.<Map<String, Object>>containsInAnyOrder(
-            ImmutableMap.<String, Object>builder()
-                .put("namespace", fn1.getClass().getName())
-                .put("key", "foo")
-                .put("type", "STRING")
-                .put("value", "bar")
-                .build(),
-            ImmutableMap.<String, Object>builder()
-                .put("namespace", fn1.getClass().getName())
-                .put("key", "foo2")
-                .put("type", "JAVA_CLASS")
-                .put("value", DataflowPipelineTranslatorTest.class.getName())
-                .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
-                .put("label", "Test Class")
-                .put("linkUrl", "http://www.google.com")
-                .build());
-
-    @SuppressWarnings("unchecked")
-    Matcher<Iterable<? extends Map<String, Object>>> fn2expectedData =
-        Matchers.<Map<String, Object>>contains(
-            ImmutableMap.<String, Object>builder()
-                .put("namespace", fn2.getClass().getName())
-                .put("key", "foo3")
-                .put("type", "INTEGER")
-                .put("value", 1234L)
-                .build());
-    assertThat(fn1displayData, fn1expectedData);
-    assertThat(fn2displayData, fn2expectedData);
+    Collection<Map<String, String>> fn1displayData =
+            (Collection<Map<String, String>>) parDo1Properties.get("display_data");
+    Collection<Map<String, String>> fn2displayData =
+            (Collection<Map<String, String>>) parDo2Properties.get("display_data");
+
+    ImmutableSet<ImmutableMap<String, Object>> expectedFn1DisplayData = ImmutableSet.of(
+        ImmutableMap.<String, Object>builder()
+            .put("key", "foo")
+            .put("type", "STRING")
+            .put("value", "bar")
+            .put("namespace", fn1.getClass().getName())
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("key", "fn")
+            .put("type", "JAVA_CLASS")
+            .put("value", fn1.getClass().getName())
+            .put("shortValue", fn1.getClass().getSimpleName())
+            .put("namespace", parDo1.getClass().getName())
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("key", "foo2")
+            .put("type", "JAVA_CLASS")
+            .put("value", DataflowPipelineTranslatorTest.class.getName())
+            .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
+            .put("namespace", fn1.getClass().getName())
+            .put("label", "Test Class")
+            .put("linkUrl", "http://www.google.com")
+            .build()
+    );
+
+    ImmutableSet<ImmutableMap<String, Object>> expectedFn2DisplayData = ImmutableSet.of(
+        ImmutableMap.<String, Object>builder()
+            .put("key", "fn")
+            .put("type", "JAVA_CLASS")
+            .put("value", fn2.getClass().getName())
+            .put("shortValue", fn2.getClass().getSimpleName())
+            .put("namespace", parDo2.getClass().getName())
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("key", "foo3")
+            .put("type", "INTEGER")
+            .put("value", 1234L)
+            .put("namespace", fn2.getClass().getName())
+            .build()
+    );
+
+    assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
+    assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
index 7833d42..85aa1c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.runners.inprocess;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.TypedPValue;
@@ -53,4 +54,9 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT
extend
       TypedPValue<T> output) throws CannotProvideCoderException {
     return delegate().getDefaultOutputCoder(input, output);
   }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    delegate().populateDisplayData(builder);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index 08c4391..bbc0220 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
 import org.apache.beam.sdk.transforms.DoFnWithContext.FinishBundle;
 import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFnWithContext.StartBundle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -653,6 +654,11 @@ public abstract class DoFnReflector {
       return fn.getOutputTypeDescriptor();
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      fn.populateDisplayData(builder);
+    }
+
     private void readObject(java.io.ObjectInputStream in)
         throws IOException, ClassNotFoundException {
       in.defaultReadObject();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
index 835730c..7143626 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
@@ -25,6 +25,8 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -82,7 +84,7 @@ import java.util.Map;
  * @param <OutputT> the type of the (main) output elements
  */
 @Experimental
-public abstract class DoFnWithContext<InputT, OutputT> implements Serializable {
+public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, HasDisplayData
{
 
   /** Information accessible to all methods in this {@code DoFnWithContext}. */
   public abstract class Context {
@@ -414,4 +416,14 @@ public abstract class DoFnWithContext<InputT, OutputT> implements
Serializable {
   void prepareForProcessing() {
     aggregatorsAreFinal = true;
   }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>By default, does not register any display data. Implementors may override this
method
+   * to provide their own display metadata.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 547254d..0e5e4a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
@@ -99,9 +100,15 @@ public class Filter<T> extends PTransform<PCollection<T>,
PCollection<T>> {
           c.output(c.element());
         }
       }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        Filter.populateDisplayData(builder, String.format("x < %s", value));
+      }
     });
   }
 
+
   /**
    * Returns a {@code PTransform} that takes an input
    * {@code PCollection<T>} and returns a {@code PCollection<T>} with
@@ -131,6 +138,11 @@ public class Filter<T> extends PTransform<PCollection<T>,
PCollection<T>> {
           c.output(c.element());
         }
       }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        Filter.populateDisplayData(builder, String.format("x > %s", value));
+      }
     });
   }
 
@@ -163,6 +175,11 @@ public class Filter<T> extends PTransform<PCollection<T>,
PCollection<T>> {
           c.output(c.element());
         }
       }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        Filter.populateDisplayData(builder, String.format("x ≤ %s", value));
+      }
     });
   }
 
@@ -195,6 +212,11 @@ public class Filter<T> extends PTransform<PCollection<T>,
PCollection<T>> {
           c.output(c.element());
         }
       }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        Filter.populateDisplayData(builder, String.format("x ≥ %s", value));
+      }
     });
   }
 
@@ -232,4 +254,9 @@ public class Filter<T> extends PTransform<PCollection<T>,
PCollection<T>> {
   protected Coder<T> getDefaultOutputCoder(PCollection<T> input) {
     return input.getCoder();
   }
+
+  private static void populateDisplayData(
+      DisplayData.Builder builder, String predicateDescription) {
+    builder.add("predicate", predicateDescription);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 42c1f78..1b3c454 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -273,4 +274,12 @@ public class GroupByKey<K, V>
   public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K,
V>> inputCoder) {
     return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
   }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    if (fewKeys) {
+      builder.add("fewKeys", true);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
index c66aa8d..1b91562 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -172,6 +173,14 @@ public class IntraBundleParallelization {
       return input.apply(
           ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<>(doFn, maxParallelism)));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add("maxParallelism", maxParallelism)
+          .add("fn", doFn.getClass())
+          .include(doFn);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index b448c26..d266155 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.DirectModeExecutionContext;
@@ -556,7 +557,12 @@ public class ParDo {
    * properties can be set on it first.
    */
   public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT>
fn) {
-    return new Unbound().of(fn);
+    return of(fn, fn.getClass());
+  }
+
+  private static <InputT, OutputT> Bound<InputT, OutputT> of(
+          DoFn<InputT, OutputT> fn, Class<?> fnClass) {
+    return new Unbound().of(fn, fnClass);
   }
 
   private static <InputT, OutputT> DoFn<InputT, OutputT>
@@ -579,7 +585,7 @@ public class ParDo {
    */
   @Experimental
   public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT,
OutputT> fn) {
-    return of(adapt(fn));
+    return of(adapt(fn), fn.getClass());
   }
 
   /**
@@ -666,9 +672,15 @@ public class ParDo {
      * still be specified.
      */
     public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT>
fn) {
-      return new Bound<>(name, sideInputs, fn);
+      return of(fn, fn.getClass());
+    }
+
+    private <InputT, OutputT> Bound<InputT, OutputT> of(
+        DoFn<InputT, OutputT> fn, Class<?> fnClass) {
+      return new Bound<>(name, sideInputs, fn, fnClass);
     }
 
+
     /**
      * Returns a new {@link ParDo} {@link PTransform} that's like this
      * transform but which will invoke the given {@link DoFnWithContext}
@@ -678,7 +690,7 @@ public class ParDo {
      * still be specified.
      */
     public <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT,
OutputT> fn) {
-      return of(adapt(fn));
+      return of(adapt(fn), fn.getClass());
     }
   }
 
@@ -699,13 +711,16 @@ public class ParDo {
     // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
     private final DoFn<InputT, OutputT> fn;
+    private final Class<?> fnClass;
 
     Bound(String name,
           List<PCollectionView<?>> sideInputs,
-          DoFn<InputT, OutputT> fn) {
+          DoFn<InputT, OutputT> fn,
+          Class<?> fnClass) {
       super(name);
       this.sideInputs = sideInputs;
       this.fn = SerializableUtils.clone(fn);
+      this.fnClass = fnClass;
     }
 
     /**
@@ -716,7 +731,7 @@ public class ParDo {
      * <p>See the discussion of Naming above for more explanation.
      */
     public Bound<InputT, OutputT> named(String name) {
-      return new Bound<>(name, sideInputs, fn);
+      return new Bound<>(name, sideInputs, fn, fnClass);
     }
 
     /**
@@ -744,7 +759,7 @@ public class ParDo {
       ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
       builder.addAll(this.sideInputs);
       builder.addAll(sideInputs);
-      return new Bound<>(name, builder.build(), fn);
+      return new Bound<>(name, builder.build(), fn, fnClass);
     }
 
     /**
@@ -758,7 +773,7 @@ public class ParDo {
     public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag,
                                            TupleTagList sideOutputTags) {
       return new BoundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags, fn);
+          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
     }
 
     @Override
@@ -799,7 +814,7 @@ public class ParDo {
      */
     @Override
     public void populateDisplayData(Builder builder) {
-      builder.include(fn);
+      ParDo.populateDisplayData(builder, fn, fnClass);
     }
 
     public DoFn<InputT, OutputT> getFn() {
@@ -891,8 +906,12 @@ public class ParDo {
      * more properties can still be specified.
      */
     public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT>
fn) {
+      return of(fn, fn.getClass());
+    }
+
+    public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT>
fn, Class<?> fnClass) {
       return new BoundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags, fn);
+              name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
     }
 
     /**
@@ -904,7 +923,7 @@ public class ParDo {
      * more properties can still be specified.
      */
     public <InputT> BoundMulti<InputT, OutputT> of(DoFnWithContext<InputT,
OutputT> fn) {
-      return of(adapt(fn));
+      return of(adapt(fn), fn.getClass());
     }
   }
 
@@ -926,17 +945,20 @@ public class ParDo {
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
     private final DoFn<InputT, OutputT> fn;
+    private final Class<?> fnClass;
 
     BoundMulti(String name,
                List<PCollectionView<?>> sideInputs,
                TupleTag<OutputT> mainOutputTag,
                TupleTagList sideOutputTags,
-               DoFn<InputT, OutputT> fn) {
+               DoFn<InputT, OutputT> fn,
+               Class<?> fnClass) {
       super(name);
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.sideOutputTags = sideOutputTags;
       this.fn = SerializableUtils.clone(fn);
+      this.fnClass = fnClass;
     }
 
     /**
@@ -948,7 +970,7 @@ public class ParDo {
      */
     public BoundMulti<InputT, OutputT> named(String name) {
       return new BoundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags, fn);
+          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
     }
 
     /**
@@ -979,7 +1001,7 @@ public class ParDo {
       builder.addAll(sideInputs);
       return new BoundMulti<>(
           name, builder.build(),
-          mainOutputTag, sideOutputTags, fn);
+          mainOutputTag, sideOutputTags, fn, fnClass);
     }
 
 
@@ -1027,6 +1049,11 @@ public class ParDo {
       }
     }
 
+    @Override
+    public void populateDisplayData(Builder builder) {
+      ParDo.populateDisplayData(builder, fn, fnClass);
+    }
+
     public DoFn<InputT, OutputT> getFn() {
       return fn;
     }
@@ -1233,6 +1260,13 @@ public class ParDo {
     return DirectSideInputReader.of(sideInputValues);
   }
 
+  private static void populateDisplayData(
+      DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) {
+    builder
+        .include(fn, fnClass)
+        .add("fn", fnClass);
+  }
+
   /**
    * A {@code DoFnRunner.OutputManager} that provides facilities for checking output values
for
    * illegal mutations.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 47d49f7..5366fd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -121,6 +122,11 @@ public class Partition<T> extends PTransform<PCollection<T>,
PCollectionList<T>>
     return pcs;
   }
 
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    builder.include(partitionDoFn);
+  }
+
   private final transient PartitionDoFn<T> partitionDoFn;
 
   private Partition(PartitionDoFn<T> partitionDoFn) {
@@ -170,5 +176,12 @@ public class Partition<T> extends PTransform<PCollection<T>,
PCollectionList<T>>
             partition + " not in [0.." + numPartitions + ")");
       }
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add("numPartitions", numPartitions)
+          .add("partitionFn", partitionFn.getClass());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
index ca3753c..366dfc5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 
 import org.junit.Before;
@@ -99,4 +100,13 @@ public class ForwardingPTransformTest {
     when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder);
     assertThat(forwarding.getDefaultOutputCoder(input, output), equalTo(outputCoder));
   }
+
+  @Test
+  public void populateDisplayDataDelegates() {
+    DisplayData.Builder builder = mock(DisplayData.Builder.class);
+    doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder);
+
+    thrown.expect(RuntimeException.class);
+    forwarding.populateDisplayData(builder);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
index 40c80b7..391081a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
@@ -18,8 +18,10 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -158,6 +161,14 @@ public class DoFnWithContextTest implements Serializable {
   }
 
   @Test
+  public void testDefaultPopulateDisplayDataImplementation() {
+    DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>()
{
+    };
+    DisplayData displayData = DisplayData.from(fn);
+    assertThat(displayData.items(), empty());
+  }
+
+  @Test
   public void testCreateAggregatorInStartBundleThrows() {
     TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
       @StartBundle

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index f15f48e..f58ba17 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -17,9 +17,14 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 
 import org.junit.Test;
@@ -158,4 +163,19 @@ public class FilterTest implements Serializable {
     PAssert.that(output).containsInAnyOrder(5, 6, 7);
     p.run();
   }
+
+  @Test
+  public void testDisplayData() {
+    ParDo.Bound<Integer, Integer> lessThan = Filter.lessThan(123);
+    assertThat(DisplayData.from(lessThan), hasDisplayItem("predicate", "x < 123"));
+
+    ParDo.Bound<Integer, Integer> lessThanOrEqual = Filter.lessThanEq(234);
+    assertThat(DisplayData.from(lessThanOrEqual), hasDisplayItem("predicate", "x ≤ 234"));
+
+    ParDo.Bound<Integer, Integer> greaterThan = Filter.greaterThan(345);
+    assertThat(DisplayData.from(greaterThan), hasDisplayItem("predicate", "x > 345"));
+
+    ParDo.Bound<Integer, Integer> greaterThanOrEqual = Filter.greaterThanEq(456);
+    assertThat(DisplayData.from(greaterThanOrEqual), hasDisplayItem("predicate", "x ≥ 456"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 1a7b0b7..b84845a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -18,7 +18,9 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.core.Is.is;
@@ -35,6 +37,7 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -375,4 +378,16 @@ public class GroupByKeyTest {
   public void testGroupByKeyGetName() {
     Assert.assertEquals("GroupByKey", GroupByKey.<String, Integer>create().getName());
   }
+
+  @Test
+  public void testDisplayData() {
+    GroupByKey<String, String> groupByKey = GroupByKey.create();
+    GroupByKey<String, String> groupByFewKeys = GroupByKey.create(true);
+
+    DisplayData gbkDisplayData = DisplayData.from(groupByKey);
+    DisplayData fewKeysDisplayData = DisplayData.from(groupByFewKeys);
+
+    assertThat(gbkDisplayData.items(), empty());
+    assertThat(fewKeysDisplayData, hasDisplayItem("fewKeys", true));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
index dd01919..80f6188 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
 
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
@@ -31,6 +33,7 @@ import static org.junit.Assert.fail;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -215,6 +218,29 @@ public class IntraBundleParallelizationTest {
         IntraBundleParallelization.of(new DelayFn<Integer>()).withMaxParallelism(1).getName());
   }
 
+  @Test
+  public void testDisplayData() {
+    DoFn<String, String> fn = new DoFn<String, String>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add("foo", "bar");
+      }
+    };
+
+    PTransform<?, ?> transform = IntraBundleParallelization
+        .withMaxParallelism(1234)
+        .of(fn);
+
+    DisplayData displayData = DisplayData.from(transform);
+    assertThat(displayData, includes(fn));
+    assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
+    assertThat(displayData, hasDisplayItem("maxParallelism", 1234));
+  }
+
   /**
    * Runs the provided doFn inside of an {@link IntraBundleParallelization} transform.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 5724dd6..44154e6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -19,10 +19,13 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -46,6 +49,7 @@ import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IllegalMutationException;
@@ -1525,20 +1529,66 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  public void testIncludesDoFnDisplayData() {
-    Bound<String, String> parDo =
-        ParDo.of(
-            new DoFn<String, String>() {
-              @Override
-              public void processElement(ProcessContext c) {}
+  public void testDoFnDisplayData() {
+    DoFn<String, String> fn = new DoFn<String, String>() {
+      @Override
+      public void processElement(ProcessContext c) {
+      }
 
-              @Override
-              public void populateDisplayData(Builder builder) {
-                builder.add("foo", "bar");
-              }
-            });
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add("doFnMetadata", "bar");
+      }
+    };
+
+    Bound<String, String> parDo = ParDo.of(fn);
+
+    DisplayData displayData = DisplayData.from(parDo);
+    assertThat(displayData, hasDisplayItem(allOf(
+        hasKey("fn"),
+        hasType(DisplayData.Type.JAVA_CLASS),
+        DisplayDataMatchers.hasValue(fn.getClass().getName()))));
+
+    assertThat(displayData, includes(fn));
+  }
+
+  @Test
+  public void testDoFnWithContextDisplayData() {
+    DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>()
{
+      @ProcessElement
+      public void proccessElement(ProcessContext c) {}
+
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add("fnMetadata", "foobar");
+      }
+    };
+
+    Bound<String, String> parDo = ParDo.of(fn);
+
+    DisplayData displayData = DisplayData.from(parDo);
+    assertThat(displayData, includes(fn));
+    assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
+  }
+
+  @Test
+  public void testWithOutputTagsDisplayData() {
+    DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>()
{
+      @ProcessElement
+      public void proccessElement(ProcessContext c) {}
+
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add("fnMetadata", "foobar");
+      }
+    };
+
+    ParDo.BoundMulti parDo = ParDo
+            .withOutputTags(new TupleTag(), TupleTagList.empty())
+            .of(fn);
 
     DisplayData displayData = DisplayData.from(parDo);
-    assertThat(displayData, hasDisplayItem(hasKey("foo")));
+    assertThat(displayData, includes(fn));
+    assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
index dba6c16..608da0f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -25,6 +28,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Partition.PartitionFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
@@ -138,4 +142,13 @@ public class PartitionTest implements Serializable {
   public void testPartitionGetName() {
     assertEquals("Partition", Partition.of(3, new ModFn()).getName());
   }
+
+  @Test
+  public void testDisplayData() {
+    Partition<?> partition = Partition.of(123, new IdentityFn());
+    DisplayData displayData = DisplayData.from(partition);
+
+    assertThat(displayData, hasDisplayItem("numPartitions", 123));
+    assertThat(displayData, hasDisplayItem("partitionFn", IdentityFn.class));
+  }
 }



Mime
View raw message