beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [2/3] incubator-beam git commit: Add Display Data 'path' metadata
Date Thu, 20 Oct 2016 18:19:01 GMT
Add Display Data 'path' metadata

Display Data supports the notion of "sub components", components within
a transform class which can contribute their own display data. We add a
namespace to display data items based on the originating component,
which keeps the display data items unique within the step.

There are instances where a component is included multiple times within
a step. We handle the case of the same instance being shared by simply
ignoring it the second time. However, we don't handle the case of a
separate instance being added of the same class. Currently the separate
instances will add display data with the same namespace and key, causing
a failure.

This can come up for example when infrastructure at different levels
wrap and re-wrap a component. We saw this with a bounded source being
adapted multiple times, Bounded -> Unbounded -> Bounded -> Unbounded.
The BoundedToUnboundedSourceAdapter was included multiple times with
separate instances and caused a failure while populating display data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad03d07a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad03d07a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad03d07a

Branch: refs/heads/master
Commit: ad03d07ae783f054a31e8b2e14100afff8cdf747
Parents: ff6301b
Author: Scott Wegner <swegner@google.com>
Authored: Wed Oct 12 14:49:41 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Thu Oct 20 11:10:13 2016 -0700

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java    |   2 +-
 runners/direct-java/pom.xml                     |   5 -
 .../runners/direct/ForwardingPTransform.java    |   2 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  32 --
 .../direct/ForwardingPTransformTest.java        |   7 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   5 +-
 .../DataflowUnboundedReadFromBoundedSource.java |   4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   5 +-
 .../apache/beam/sdk/io/CompressedSource.java    |   2 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |   5 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   4 +-
 .../main/java/org/apache/beam/sdk/io/Write.java |   6 +-
 .../sdk/options/ProxyInvocationHandler.java     | 149 +++---
 .../org/apache/beam/sdk/transforms/Combine.java |  60 +--
 .../apache/beam/sdk/transforms/CombineFns.java  |  33 +-
 .../beam/sdk/transforms/CombineWithContext.java |   3 +-
 .../beam/sdk/transforms/DoFnAdapters.java       |   2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |   6 +-
 .../apache/beam/sdk/transforms/MapElements.java |   8 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  60 ++-
 .../apache/beam/sdk/transforms/Partition.java   |   2 +-
 .../sdk/transforms/display/DisplayData.java     | 518 +++++++++++++------
 .../beam/sdk/transforms/windowing/Window.java   |   2 +-
 .../io/BoundedReadFromUnboundedSourceTest.java  |   4 +-
 .../beam/sdk/io/CompressedSourceTest.java       |   4 +-
 .../java/org/apache/beam/sdk/io/ReadTest.java   |  10 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   6 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |  40 ++
 .../beam/sdk/transforms/CombineFnsTest.java     |   7 +-
 .../apache/beam/sdk/transforms/CombineTest.java |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |   8 +-
 .../transforms/display/DisplayDataMatchers.java | 141 +++--
 .../display/DisplayDataMatchersTest.java        |  67 ++-
 .../sdk/transforms/display/DisplayDataTest.java | 367 +++++++++----
 .../sdk/transforms/windowing/WindowTest.java    |   4 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   5 -
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |   2 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |   2 +-
 38 files changed, 988 insertions(+), 605 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 91a1715..2afdcf2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -108,7 +108,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     // We explicitly do not register base-class data, instead we use the delegate inner source.
     builder
         .add(DisplayData.item("source", source.getClass()))
-        .include(source);
+        .include("source", source);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 354c8c7..6cb1838 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -286,11 +286,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
index 3160b58..77311c2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
@@ -57,6 +57,6 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
 
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
-    delegate().populateDisplayData(builder);
+    builder.delegate(delegate());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 37af90c..4027d25 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
@@ -242,37 +241,6 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-  @Test
-  public void pipelineOptionsDisplayDataExceptionShouldFail() {
-    Object brokenValueType = new Object() {
-      @JsonValue
-      public int getValue () {
-        return 42;
-      }
-
-      @Override
-      public String toString() {
-        throw new RuntimeException("oh noes!!");
-      }
-    };
-
-    Pipeline p = getPipeline();
-    p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
-
-    p.apply(Create.of(1, 2, 3));
-
-    thrown.expectMessage(PipelineOptions.class.getName());
-    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
-    p.run();
-  }
-
-  /** {@link PipelineOptions} to inject bad object implementations. */
-  public interface ObjectPipelineOptions extends PipelineOptions {
-    Object getValue();
-    void setValue(Object value);
-  }
-
-
   /**
    * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
    * {@link DirectRunner}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
index 6abaf92..c75adaa 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -102,10 +103,10 @@ public class ForwardingPTransformTest {
 
   @Test
   public void populateDisplayDataDelegates() {
-    DisplayData.Builder builder = mock(DisplayData.Builder.class);
-    doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder);
+    doThrow(RuntimeException.class)
+        .when(delegate).populateDisplayData(any(DisplayData.Builder.class));
 
     thrown.expect(RuntimeException.class);
-    forwarding.populateDisplayData(builder);
+    DisplayData.from(forwarding);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5f83788..7bf270d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2249,8 +2249,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        builder.add(DisplayData.item("source", source.getClass()));
-        builder.include(source);
+        builder
+            .add(DisplayData.item("source", source.getClass()))
+            .include("source", source);
       }
 
       public UnboundedSource<T, ?> getSource() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index e4257d1..96a35bc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -120,7 +120,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
     // We explicitly do not register base-class data, instead we use the delegate inner source.
     builder
         .add(DisplayData.item("source", source.getClass()))
-        .include(source);
+        .include("source", source);
   }
 
   /**
@@ -195,7 +195,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.add(DisplayData.item("source", boundedSource.getClass()));
-      builder.include(boundedSource);
+      builder.include("source", boundedSource);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 630a8a3..40c52a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
           .withLabel("Maximum Read Records"), Long.MAX_VALUE)
         .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
           .withLabel("Maximum Read Time"))
-        .include(source);
+        .include("source", source);
   }
 
   private static class UnboundedToBoundedSourceAdapter<T>
@@ -204,8 +204,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(DisplayData.item("source", source.getClass()));
-      builder.include(source);
+      builder.delegate(source);
     }
 
     private class Reader extends BoundedReader<ValueWithRecordId<T>> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 680dc2c..bf871b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -390,7 +390,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   public void populateDisplayData(DisplayData.Builder builder) {
     // We explicitly do not register base-class data, instead we use the delegate inner source.
     builder
-        .include(sourceDelegate)
+        .include("source", sourceDelegate)
         .add(DisplayData.item("source", sourceDelegate.getClass())
           .withLabel("Read Source"));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 6091156..72a6399 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -792,8 +792,7 @@ public class PubsubIO {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          super.populateDisplayData(builder);
-          Bound.this.populateDisplayData(builder);
+          builder.delegate(Bound.this);
         }
       }
     }
@@ -1043,7 +1042,7 @@ public class PubsubIO {
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
           super.populateDisplayData(builder);
-          Bound.this.populateDisplayData(builder);
+          builder.delegate(Bound.this);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 29c4e47..f04fbaf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -127,7 +127,7 @@ public class Read {
       builder
           .add(DisplayData.item("source", source.getClass())
             .withLabel("Read Source"))
-          .include(source);
+          .include("source", source);
     }
   }
 
@@ -194,7 +194,7 @@ public class Read {
       builder
           .add(DisplayData.item("source", source.getClass())
             .withLabel("Read Source"))
-          .include(source);
+          .include("source", source);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index e8b19d9..7559fca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -118,7 +118,7 @@ public class Write {
       super.populateDisplayData(builder);
       builder
           .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-          .include(sink)
+          .include("sink", sink)
           .addIfNotDefault(
               DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"),
               0);
@@ -209,7 +209,7 @@ public class Write {
 
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        Write.Bound.this.populateDisplayData(builder);
+        builder.delegate(Write.Bound.this);
       }
     }
 
@@ -261,7 +261,7 @@ public class Write {
 
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        Write.Bound.this.populateDisplayData(builder);
+        builder.delegate(Write.Bound.this);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index a77dcc6..3e74916 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -86,7 +86,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
  * {@link PipelineOptions#as(Class)}.
  */
 @ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
+class ProxyInvocationHandler implements InvocationHandler {
   private static final ObjectMapper MAPPER = new ObjectMapper();
   /**
    * No two instances of this class are considered equivalent hence we generate a random hash code.
@@ -138,8 +138,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
         && args[0] instanceof DisplayData.Builder) {
       @SuppressWarnings("unchecked")
       DisplayData.Builder builder = (DisplayData.Builder) args[0];
-      // Explicitly set display data namespace so thrown exceptions will have sensible type.
-      builder.include(this, PipelineOptions.class);
+      builder.delegate(new PipelineOptionsDisplayData());
       return Void.TYPE;
     }
     String methodName = method.getName();
@@ -243,88 +242,116 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
   }
 
   /**
-   * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
-   * pipeline options will be added as display data.
+   * Nested class to handle display data in order to set the display data namespace to something
+   * sensible.
    */
-  public void populateDisplayData(DisplayData.Builder builder) {
-    Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
-    Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
-
-    for (Map.Entry<String, BoundValue> option : options.entrySet()) {
-      BoundValue boundValue = option.getValue();
-      if (boundValue.isDefault()) {
-        continue;
-      }
+  class PipelineOptionsDisplayData implements HasDisplayData {
+    /**
+     * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
+     * pipeline options will be added as display data.
+     */
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Set<PipelineOptionSpec> optionSpecs =
+          PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
 
-      Object value = boundValue.getValue() == null ? "" : boundValue.getValue();
-      DisplayData.Type type = DisplayData.inferType(value);
-      HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+      Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
 
-      for (PipelineOptionSpec optionSpec : specs) {
-        if (!optionSpec.shouldSerialize()) {
-          // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also
-          // excluded from display data. These options are generally not useful for display.
+      for (Map.Entry<String, BoundValue> option : options.entrySet()) {
+        BoundValue boundValue = option.getValue();
+        if (boundValue.isDefault()) {
           continue;
         }
 
-        Class<?> pipelineInterface = optionSpec.getDefiningInterface();
-        if (type != null) {
-          builder.add(DisplayData.item(option.getKey(), type, value)
-              .withNamespace(pipelineInterface));
-        } else {
-          builder.add(DisplayData.item(option.getKey(), displayDataString(value))
-              .withNamespace(pipelineInterface));
+        DisplayDataValue resolved = DisplayDataValue.resolve(boundValue.getValue());
+        HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+
+        for (PipelineOptionSpec optionSpec : specs) {
+          if (!optionSpec.shouldSerialize()) {
+            // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also
+            // excluded from display data. These options are generally not useful for display.
+            continue;
+          }
+
+          builder.add(DisplayData.item(option.getKey(), resolved.getType(), resolved.getValue())
+              .withNamespace(optionSpec.getDefiningInterface()));
         }
       }
-    }
 
-    for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
-      if (options.containsKey(jsonOption.getKey())) {
-        // Option overwritten since deserialization; don't re-write
-        continue;
-      }
+      for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
+        if (options.containsKey(jsonOption.getKey())) {
+          // Option overwritten since deserialization; don't re-write
+          continue;
+        }
+
+        HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
+        if (specs.isEmpty()) {
+          // No PipelineOptions interface for this key not currently loaded
+          builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString())
+              .withNamespace(UnknownPipelineOptions.class));
+          continue;
+        }
 
-      HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
-      if (specs.isEmpty()) {
-        builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString())
-          .withNamespace(UnknownPipelineOptions.class));
-      } else {
         for (PipelineOptionSpec spec : specs) {
           if (!spec.shouldSerialize()) {
             continue;
           }
 
           Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod());
-          value = value == null ? "" : value;
-          DisplayData.Type type = DisplayData.inferType(value);
-          if (type != null) {
-            builder.add(DisplayData.item(jsonOption.getKey(), type, value)
-                .withNamespace(spec.getDefiningInterface()));
-          } else {
-            builder.add(DisplayData.item(jsonOption.getKey(), displayDataString(value))
-                .withNamespace(spec.getDefiningInterface()));
-          }
+          DisplayDataValue resolved = DisplayDataValue.resolve(value);
+          builder.add(DisplayData.item(jsonOption.getKey(), resolved.getType(), resolved.getValue())
+              .withNamespace(spec.getDefiningInterface()));
         }
       }
     }
   }
 
   /**
-   * {@link Object#toString()} wrapper to extract display data values for various types.
+   * Helper class to resolve a {@link DisplayData} type and value from {@link PipelineOptions}.
    */
-  private String displayDataString(Object value) {
-    checkNotNull(value, "value cannot be null");
-    if (!value.getClass().isArray()) {
-      return value.toString();
-    }
-    if (!value.getClass().getComponentType().isPrimitive()) {
-      return Arrays.deepToString((Object[]) value);
+  @AutoValue
+  abstract static class DisplayDataValue {
+    /**
+     * The resolved display data value. May differ from the input to {@link #resolve(Object)}
+     */
+    abstract Object getValue();
+
+    /** The resolved display data type. */
+    abstract DisplayData.Type getType();
+
+    /**
+     * Infer the value and {@link DisplayData.Type type} for the given
+     * {@link PipelineOptions} value.
+     */
+    static DisplayDataValue resolve(@Nullable Object value) {
+      DisplayData.Type type = DisplayData.inferType(value);
+
+      if (type == null) {
+        value = displayDataString(value);
+        type = DisplayData.Type.STRING;
+      }
+
+      return new AutoValue_ProxyInvocationHandler_DisplayDataValue(value, type);
     }
 
-    // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an
-    // Object array, but will unwrap nested primitive arrays.
-    String wrapped = Arrays.deepToString(new Object[] {value});
-    return wrapped.substring(1, wrapped.length() - 1);
+    /**
+     * Safe {@link Object#toString()} wrapper to extract display data values for various types.
+     */
+    private static String displayDataString(@Nullable Object value) {
+      if (value == null) {
+        return "";
+      }
+      if (!value.getClass().isArray()) {
+        return value.toString();
+      }
+      if (!value.getClass().getComponentType().isPrimitive()) {
+        return Arrays.deepToString((Object[]) value);
+      }
+
+      // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an
+      // Object array, but will unwrap nested primitive arrays.
+      String wrapped = Arrays.deepToString(new Object[]{value});
+      return wrapped.substring(1, wrapped.length() - 1);
+    }
   }
 
   /**
@@ -587,7 +614,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
 
         List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
         DisplayData displayData = DisplayData.from(value);
-        for (DisplayData.Item<?> item : displayData.items()) {
+        for (DisplayData.Item item : displayData.items()) {
           @SuppressWarnings("unchecked")
           Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class);
           serializedDisplayData.add(serializedItem);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index df9a306..7719c73 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -124,14 +124,14 @@ public class Combine {
     return globally(fn, displayDataForFn(fn));
   }
 
-  private static <T> DisplayData.Item<? extends Class<?>> displayDataForFn(T fn) {
+  private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
     return DisplayData.item("combineFn", fn.getClass())
         .withLabel("Combiner");
   }
 
   private static <InputT, OutputT> Globally<InputT, OutputT> globally(
       GlobalCombineFn<? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new Globally<>(fn, fnDisplayData, true, 0);
   }
 
@@ -200,7 +200,7 @@ public class Combine {
 
   private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
           PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/);
   }
 
@@ -210,7 +210,7 @@ public class Combine {
    */
   private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
       PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/);
   }
 
@@ -294,7 +294,7 @@ public class Combine {
 
   private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
       PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new GroupedValues<>(fn, fnDisplayData);
   }
 
@@ -521,7 +521,7 @@ public class Combine {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          builder.include(CombineFn.this);
+          builder.delegate(CombineFn.this);
         }
       };
     }
@@ -1258,7 +1258,7 @@ public class Combine {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          builder.include(KeyedCombineFn.this);
+          builder.delegate(KeyedCombineFn.this);
         }
       };
     }
@@ -1325,13 +1325,13 @@ public class Combine {
       extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
     private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final boolean insertDefault;
     private final int fanout;
     private final List<PCollectionView<?>> sideInputs;
 
     private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.insertDefault = insertDefault;
@@ -1340,7 +1340,7 @@ public class Combine {
     }
 
     private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
       super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
@@ -1350,7 +1350,7 @@ public class Combine {
     }
 
     private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
         List<PCollectionView<?>> sideInputs) {
       super(name);
       this.fn = fn;
@@ -1498,9 +1498,9 @@ public class Combine {
 
   private static void populateDisplayData(
       DisplayData.Builder builder, HasDisplayData fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayItem) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayItem) {
     builder
-        .include(fn)
+        .include("combineFn", fn)
         .add(fnDisplayItem);
   }
 
@@ -1556,13 +1556,13 @@ public class Combine {
       extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
 
     private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final boolean insertDefault;
     private final int fanout;
 
     private GloballyAsSingletonView(
         GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.insertDefault = insertDefault;
@@ -1762,13 +1762,13 @@ public class Combine {
     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
     private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final boolean fewKeys;
     private final List<PCollectionView<?>> sideInputs;
 
     private PerKey(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.fewKeys = fewKeys;
@@ -1777,7 +1777,7 @@ public class Combine {
 
     private PerKey(String name,
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         boolean fewKeys, List<PCollectionView<?>> sideInputs) {
       super(name);
       this.fn = fn;
@@ -1788,7 +1788,7 @@ public class Combine {
 
     private PerKey(
         String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
       super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
@@ -1888,12 +1888,12 @@ public class Combine {
       extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
     private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
 
     private PerKeyWithHotKeyFanout(String name,
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         SerializableFunction<? super K, Integer> hotKeyFanout) {
       super(name);
       this.fn = fn;
@@ -1976,7 +1976,7 @@ public class Combine {
 
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
         postCombine =
@@ -2024,7 +2024,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
       } else {
@@ -2068,7 +2068,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
         postCombine =
@@ -2117,7 +2117,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
       }
@@ -2202,7 +2202,7 @@ public class Combine {
 
       Combine.populateDisplayData(builder, fn, fnDisplayData);
       if (hotKeyFanout instanceof HasDisplayData) {
-        builder.include((HasDisplayData) hotKeyFanout);
+        builder.include("hotKeyFanout", (HasDisplayData) hotKeyFanout);
       }
       builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
         .withLabel("Fanout Function"));
@@ -2349,12 +2349,12 @@ public class Combine {
                          PCollection<KV<K, OutputT>>> {
 
     private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final List<PCollectionView<?>> sideInputs;
 
     private GroupedValues(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       this.fn = SerializableUtils.clone(fn);
       this.fnDisplayData = fnDisplayData;
       this.sideInputs = ImmutableList.of();
@@ -2362,7 +2362,7 @@ public class Combine {
 
     private GroupedValues(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         List<PCollectionView<?>> sideInputs) {
       this.fn = SerializableUtils.clone(fn);
       this.fnDisplayData = fnDisplayData;
@@ -2402,7 +2402,7 @@ public class Combine {
 
             @Override
             public void populateDisplayData(DisplayData.Builder builder) {
-              Combine.GroupedValues.this.populateDisplayData(builder);
+              builder.delegate(Combine.GroupedValues.this);
             }
           }).withSideInputs(sideInputs));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 229b1d2..1b3e525 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -21,18 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1044,35 +1040,12 @@ public class CombineFns {
    */
   private static void populateDisplayData(
       DisplayData.Builder builder, List<? extends HasDisplayData> combineFns) {
-
-    // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type.
-    Multimap<Class<?>, HasDisplayData> combineFnMap = ArrayListMultimap.create();
-
     for (int i = 0; i < combineFns.size(); i++) {
       HasDisplayData combineFn = combineFns.get(i);
-      builder.add(DisplayData.item("combineFn" + (i + 1), combineFn.getClass())
+      String token = "combineFn" + (i + 1);
+      builder.add(DisplayData.item(token, combineFn.getClass())
         .withLabel("Combine Function"));
-      combineFnMap.put(combineFn.getClass(), combineFn);
-    }
-
-    for (Map.Entry<Class<?>, Collection<HasDisplayData>> combineFnEntries :
-        combineFnMap.asMap().entrySet()) {
-
-      Collection<HasDisplayData> classCombineFns = combineFnEntries.getValue();
-      if (classCombineFns.size() == 1) {
-        // Only one combineFn of this type, include it directly.
-        builder.include(Iterables.getOnlyElement(classCombineFns));
-
-      } else {
-        // Multiple combineFns of same type, add a namespace suffix so display data is
-        // unique and ordered.
-        String baseNamespace = combineFnEntries.getKey().getName();
-        for (int i = 0; i < combineFns.size(); i++) {
-          HasDisplayData combineFn = combineFns.get(i);
-          String namespace = String.format("%s#%d", baseNamespace, i + 1);
-          builder.include(combineFn, namespace);
-        }
-      }
+      builder.include(token, combineFn);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index 3dd4fe2..7ac952c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -171,8 +171,7 @@ public class CombineWithContext {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          super.populateDisplayData(builder);
-          CombineFnWithContext.this.populateDisplayData(builder);
+          builder.delegate(CombineFnWithContext.this);
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 12d4824..18d9333 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -246,7 +246,7 @@ public class DoFnAdapters {
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.include(fn);
+      builder.delegate(fn);
     }
 
     private void readObject(java.io.ObjectInputStream in)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index b590d45..4ef809f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -119,7 +119,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   //////////////////////////////////////////////////////////////////////////////////////////////////
 
   private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn;
-  private final DisplayData.Item<?> fnClassDisplayData;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
   private FlatMapElements(
       SimpleFunction<InputT, ? extends Iterable<OutputT>> fn,
@@ -166,7 +166,9 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.include(fn).add(fnClassDisplayData);
+    builder
+        .include("flatMapFn", fn)
+        .add(fnClassDisplayData);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 73e4359..c109034 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -103,7 +103,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   ///////////////////////////////////////////////////////////////////
 
   private final SimpleFunction<InputT, OutputT> fn;
-  private final DisplayData.Item<?> fnClassDisplayData;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
   private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) {
     this.fn = fn;
@@ -123,7 +123,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
 
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                MapElements.this.populateDisplayData(builder);
+                builder.delegate(MapElements.this);
               }
 
               @Override
@@ -141,6 +141,8 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.include(fn).add(fnClassDisplayData);
+    builder
+        .include("mapFn", fn)
+        .add(fnClassDisplayData);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 8aa87e4..93eb1ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -521,7 +521,7 @@ public class ParDo {
    */
   public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
     validate(fn);
-    return of(adapt(fn), fn.getClass());
+    return of(adapt(fn), displayDataForFn(fn));
   }
 
   /**
@@ -538,12 +538,17 @@ public class ParDo {
    */
   @Deprecated
   public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-    return of(fn, fn.getClass());
+    return of(fn, displayDataForFn(fn));
   }
 
   private static <InputT, OutputT> Bound<InputT, OutputT> of(
-          OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
-    return new Unbound().of(fn, fnClass);
+          OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+    return new Unbound().of(fn, fnDisplayData);
+  }
+
+  private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
+    return DisplayData.item("fn", fn.getClass())
+        .withLabel("Transform Function");
   }
 
   /**
@@ -666,7 +671,7 @@ public class ParDo {
      */
     public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
       validate(fn);
-      return of(adapt(fn), fn.getClass());
+      return of(adapt(fn), displayDataForFn(fn));
     }
 
     /**
@@ -681,12 +686,12 @@ public class ParDo {
      */
     @Deprecated
     public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-      return of(fn, fn.getClass());
+      return of(fn, displayDataForFn(fn));
     }
 
     private <InputT, OutputT> Bound<InputT, OutputT> of(
-        OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
-      return new Bound<>(name, sideInputs, fn, fnClass);
+        OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+      return new Bound<>(name, sideInputs, fn, fnDisplayData);
     }
   }
 
@@ -707,16 +712,16 @@ public class ParDo {
     // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
     private final OldDoFn<InputT, OutputT> fn;
-    private final Class<?> fnClass;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
     Bound(String name,
           List<PCollectionView<?>> sideInputs,
           OldDoFn<InputT, OutputT> fn,
-          Class<?> fnClass) {
+          DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
       this.sideInputs = sideInputs;
       this.fn = SerializableUtils.clone(fn);
-      this.fnClass = fnClass;
+      this.fnDisplayData = fnDisplayData;
     }
 
     /**
@@ -744,7 +749,7 @@ public class ParDo {
       ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
       builder.addAll(this.sideInputs);
       builder.addAll(sideInputs);
-      return new Bound<>(name, builder.build(), fn, fnClass);
+      return new Bound<>(name, builder.build(), fn, fnDisplayData);
     }
 
     /**
@@ -758,7 +763,7 @@ public class ParDo {
     public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag,
                                            TupleTagList sideOutputTags) {
       return new BoundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
+          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
     }
 
     @Override
@@ -802,7 +807,7 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, fn, fnClass);
+      ParDo.populateDisplayData(builder, fn, fnDisplayData);
     }
 
     public OldDoFn<InputT, OutputT> getFn() {
@@ -883,7 +888,7 @@ public class ParDo {
      */
     public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
       validate(fn);
-      return of(adapt(fn), fn.getClass());
+      return of(adapt(fn), displayDataForFn(fn));
     }
 
     /**
@@ -898,12 +903,13 @@ public class ParDo {
      */
     @Deprecated
     public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-      return of(fn, fn.getClass());
+      return of(fn, displayDataForFn(fn));
     }
 
-    private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
+    private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       return new BoundMulti<>(
-              name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
+              name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
     }
   }
 
@@ -925,20 +931,20 @@ public class ParDo {
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
     private final OldDoFn<InputT, OutputT> fn;
-    private final Class<?> fnClass;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
     BoundMulti(String name,
                List<PCollectionView<?>> sideInputs,
                TupleTag<OutputT> mainOutputTag,
                TupleTagList sideOutputTags,
                OldDoFn<InputT, OutputT> fn,
-               Class<?> fnClass) {
+               DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.sideOutputTags = sideOutputTags;
       this.fn = SerializableUtils.clone(fn);
-      this.fnClass = fnClass;
+      this.fnDisplayData = fnDisplayData;
     }
 
     /**
@@ -969,7 +975,7 @@ public class ParDo {
       builder.addAll(sideInputs);
       return new BoundMulti<>(
           name, builder.build(),
-          mainOutputTag, sideOutputTags, fn, fnClass);
+          mainOutputTag, sideOutputTags, fn, fnDisplayData);
     }
 
 
@@ -1023,7 +1029,7 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, fn, fnClass);
+      ParDo.populateDisplayData(builder, fn, fnDisplayData);
     }
 
     public OldDoFn<InputT, OutputT> getFn() {
@@ -1044,11 +1050,11 @@ public class ParDo {
   }
 
   private static void populateDisplayData(
-      DisplayData.Builder builder, OldDoFn<?, ?> fn, Class<?> fnClass) {
+      DisplayData.Builder builder, OldDoFn<?, ?> fn,
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     builder
-        .include(fn)
-        .add(DisplayData.item("fn", fnClass)
-            .withLabel("Transform Function"));
+        .include("fn", fn)
+        .add(fnDisplayData);
   }
 
   private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 9247942..5b4eead 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -124,7 +124,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.include(partitionDoFn);
+    builder.include("partitionFn", partitionDoFn);
   }
 
   private final transient PartitionDoFn<T> partitionDoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 394666b..5ab6342 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -20,15 +20,19 @@ package org.apache.beam.sdk.transforms.display;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import autovalue.shaded.com.google.common.common.base.Joiner;
 import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -48,12 +52,12 @@ import org.joda.time.format.ISODateTimeFormat;
  * interface.
  */
 public class DisplayData implements Serializable {
-  private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item<?>>newHashMap());
+  private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item>newHashMap());
   private static final DateTimeFormatter TIMESTAMP_FORMATTER = ISODateTimeFormat.dateTime();
 
-  private final ImmutableMap<Identifier, Item<?>> entries;
+  private final ImmutableMap<Identifier, Item> entries;
 
-  private DisplayData(Map<Identifier, Item<?>> entries) {
+  private DisplayData(Map<Identifier, Item> entries) {
     this.entries = ImmutableMap.copyOf(entries);
   }
 
@@ -71,7 +75,11 @@ public class DisplayData implements Serializable {
    */
   public static DisplayData from(HasDisplayData component) {
     checkNotNull(component, "component argument cannot be null");
-    return InternalBuilder.forRoot(component).build();
+
+    InternalBuilder builder = new InternalBuilder();
+    builder.include(Path.root(), component);
+
+    return builder.build();
   }
 
   /**
@@ -99,11 +107,11 @@ public class DisplayData implements Serializable {
   }
 
   @JsonValue
-  public Collection<Item<?>> items() {
+  public Collection<Item> items() {
     return entries.values();
   }
 
-  public Map<Identifier, Item<?>> asMap() {
+  public Map<Identifier, Item> asMap() {
     return entries;
   }
 
@@ -126,7 +134,7 @@ public class DisplayData implements Serializable {
   public String toString() {
     StringBuilder builder = new StringBuilder();
     boolean isFirstLine = true;
-    for (Item<?> entry : entries.values()) {
+    for (Item entry : entries.values()) {
       if (isFirstLine) {
         isFirstLine = false;
       } else {
@@ -149,70 +157,81 @@ public class DisplayData implements Serializable {
    */
   public interface Builder {
     /**
-     * Register display data from the specified subcomponent. For example, a {@link PTransform}
-     * which delegates to a user-provided function can implement {@link HasDisplayData} on the
-     * function and include it from the {@link PTransform}:
+     * Register display data from the specified subcomponent at the given path. For example, a
+     * {@link PTransform} which delegates to a user-provided function can implement
+     * {@link HasDisplayData} on the function and include it from the {@link PTransform}:
      *
      * <pre><code>{@literal @Override}
      * public void populateDisplayData(DisplayData.Builder builder) {
      *   super.populateDisplayData(builder);
      *
      *   builder
-     *     .add(DisplayData.item("userFn", userFn)) // To register the class name of the userFn
-     *     .include(userFn); // To allow the userFn to register additional display data
+     *     // To register the class name of the userFn
+     *     .add(DisplayData.item("userFn", userFn.getClass()))
+     *     // To allow the userFn to register additional display data
+     *     .include("userFn", userFn);
      * }
      * </code></pre>
      *
-     * <p>Using {@code include(subcomponent)} will associate each of the registered items with the
-     * namespace of the {@code subcomponent} being registered. To register display data in the
-     * current namespace, such as from a base class implementation, use
+     * <p>Using {@code include(path, subcomponent)} will associate each of the registered items with
+     * the namespace of the {@code subcomponent} being registered, with the specified path element
+     * relative to the current path. To register display data in the current path and namespace,
+     * such as from a base class implementation, use
      * {@code subcomponent.populateDisplayData(builder)} instead.
      *
      * @see HasDisplayData#populateDisplayData(DisplayData.Builder)
      */
-    Builder include(HasDisplayData subComponent);
+    Builder include(String path, HasDisplayData subComponent);
 
     /**
-     * Register display data from the specified subcomponent, overriding the namespace of
-     * subcomponent display items with the specified namespace.
+     * Register display data from the specified component on behalf of the current component.
+     * Display data items will be added with the subcomponent namespace but the current component
+     * path.
      *
-     * @see #include(HasDisplayData)
-     */
-    Builder include(HasDisplayData subComponent, Class<?> namespace);
-
-    /**
-     * Register display data from the specified subcomponent, overriding the namespace of
-     * subcomponent display items with the specified namespace.
+     * <p>This is useful for components which simply wrap other components and wish to retain the
+     * display data from the wrapped component. Such components should implement
+     * {@code populateDisplayData} as:
      *
-     * @see #include(HasDisplayData)
+     * <pre><code>{@literal @Override}
+     * public void populateDisplayData(DisplayData.Builder builder) {
+     *   builder.delegate(wrapped);
+     * }
+     * </code></pre>
      */
-    Builder include(HasDisplayData subComponent, String namespace);
+    Builder delegate(HasDisplayData component);
 
     /**
      * Register the given display item.
      */
-    Builder add(Item<?> item);
+    Builder add(ItemSpec<?> item);
 
     /**
      * Register the given display item if the value is not null.
      */
-    Builder addIfNotNull(Item<?> item);
+    Builder addIfNotNull(ItemSpec<?> item);
 
     /**
      * Register the given display item if the value is different than the specified default.
      */
-    <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue);
+    <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue);
   }
 
   /**
-   * {@link Item Items} are the unit of display data. Each item is identified by a given key
+   * {@link Item Items} are the unit of display data. Each item is identified by a given path, key,
    * and namespace from the component the display item belongs to.
    *
    * <p>{@link Item Items} are registered via {@link DisplayData.Builder#add}
    * within {@link HasDisplayData#populateDisplayData} implementations.
    */
   @AutoValue
-  public abstract static class Item<T> implements Serializable {
+  public abstract static class Item {
+
+    /**
+     * The path for the display item within a component hierarchy.
+     */
+    @Nullable
+    @JsonIgnore
+    public abstract Path getPath();
 
     /**
      * The namespace for the display item. The namespace defaults to the component which
@@ -220,7 +239,7 @@ public class DisplayData implements Serializable {
      */
     @Nullable
     @JsonGetter("namespace")
-    public abstract String getNamespace();
+    public abstract Class<?> getNamespace();
 
     /**
      * The key for the display item. Each display item is created with a key and value
@@ -240,11 +259,8 @@ public class DisplayData implements Serializable {
      * Retrieve the value of the display item. The value is translated from the input to
      * {@link DisplayData#item} into a format suitable for display. Translation is based on the
      * item's {@link #getType() type}.
-     *
-     * <p>The value will only be {@literal null} if the input value during creation was null.
      */
     @JsonGetter("value")
-    @Nullable
     public abstract Object getValue();
 
     /**
@@ -285,27 +301,104 @@ public class DisplayData implements Serializable {
     @Nullable
     public abstract String getLinkUrl();
 
-    private static <T> Item<T> create(String key, Type type, @Nullable T value) {
-      FormattedItemValue formatted = type.safeFormat(value);
-      return of(null, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null);
+    private static Item create(ItemSpec<?> spec, Path path) {
+      checkNotNull(spec, "spec cannot be null");
+      checkNotNull(path, "path cannot be null");
+      Class<?> ns = checkNotNull(spec.getNamespace(), "namespace must be set");
+
+      return new AutoValue_DisplayData_Item(path, ns, spec.getKey(), spec.getType(),
+          spec.getValue(), spec.getShortValue(), spec.getLabel(), spec.getLinkUrl());
     }
 
+    @Override
+    public String toString() {
+      return String.format("%s%s:%s=%s", getPath(), getNamespace().getName(), getKey(), getValue());
+    }
+  }
+
+  /**
+   * Specifies an {@link Item} to register as display data. Each item is identified by a given
+   * path, key, and namespace from the component the display item belongs to.
+   *
+   * <p>{@link Item Items} are registered via {@link DisplayData.Builder#add}
+   * within {@link HasDisplayData#populateDisplayData} implementations.
+   */
+  @AutoValue
+  public abstract static class ItemSpec<T> implements Serializable {
+    /**
+     * The namespace for the display item. If unset, defaults to the component which
+     * the display item is registered to.
+     */
+    @Nullable
+    public abstract Class<?> getNamespace();
+
+    /**
+     * The key for the display item. Each display item is created with a key and value
+     * via {@link DisplayData#item}.
+     */
+    public abstract String getKey();
+
+    /**
+     * The {@link DisplayData.Type} of display data. All display data conforms to a predefined set
+     * of allowed types.
+     */
+    public abstract Type getType();
+
+    /**
+     * The value of the display item. The value is translated from the input to
+     * {@link DisplayData#item} into a format suitable for display. Translation is based on the
+     * item's {@link #getType() type}.
+     */
+    @Nullable
+    public abstract Object getValue();
+
     /**
-     * Set the item {@link Item#getNamespace() namespace} from the given {@link Class}.
+     * The optional short value for an item, or {@code null} if none is provided.
      *
-     * <p>This method does not alter the current instance, but instead returns a new {@link Item}
-     * with the namespace set.
+     * <p>The short value is an alternative display representation for items having a long display
+     * value. For example, the {@link #getValue() value} for {@link Type#JAVA_CLASS} items contains
+     * the full class name with package, while the short value contains just the class name.
+     *
+     * <p>A {@link #getValue() value} will be provided for each display item, and some types may
+     * also provide a short-value. If a short value is provided, display data consumers may
+     * choose to display it instead of or in addition to the {@link #getValue() value}.
      */
-    public Item<T> withNamespace(Class<?> namespace) {
-      checkNotNull(namespace, "namespace argument cannot be null");
-      return withNamespace(namespaceOf(namespace));
+    @Nullable
+    public abstract Object getShortValue();
+
+    /**
+     * The optional label for an item. The label is a human-readable description of what
+     * the metadata represents. UIs may choose to display the label instead of the item key.
+     */
+    @Nullable
+    public abstract String getLabel();
+
+    /**
+     * The optional link URL for an item. The URL points to an address where the reader
+     * can find additional context for the display data.
+     */
+    @Nullable
+    public abstract String getLinkUrl();
+
+    private static <T> ItemSpec<T> create(String key, Type type, @Nullable T value) {
+      return ItemSpec.<T>builder()
+          .setKey(key)
+          .setType(type)
+          .setRawValue(value)
+          .build();
     }
 
-    /** @see #withNamespace(Class) */
-    public Item<T> withNamespace(String namespace) {
+    /**
+     * Set the item {@link ItemSpec#getNamespace() namespace} from the given {@link Class}.
+     *
+     * <p>This method does not alter the current instance, but instead returns a new
+     * {@link ItemSpec} with the namespace set.
+     */
+    public ItemSpec<T> withNamespace(Class<?> namespace) {
       checkNotNull(namespace, "namespace argument cannot be null");
-      return of(
-          namespace, getKey(), getType(), getValue(), getShortValue(), getLabel(), getLinkUrl());
+      return toBuilder()
+          .setNamespace(namespace)
+          .build();
     }
 
     /**
@@ -313,12 +406,13 @@ public class DisplayData implements Serializable {
      *
      * <p>Specifying a null value will clear the label if it was previously defined.
      *
-     * <p>This method does not alter the current instance, but instead returns a new {@link Item}
-     * with the label set.
+     * <p>This method does not alter the current instance, but instead returns a new
+     * {@link ItemSpec} with the label set.
      */
-    public Item<T> withLabel(String label) {
-      return of(
-          getNamespace(), getKey(), getType(), getValue(), getShortValue(), label, getLinkUrl());
+    public ItemSpec<T> withLabel(@Nullable String label) {
+      return toBuilder()
+          .setLabel(label)
+          .build();
     }
 
     /**
@@ -326,11 +420,13 @@ public class DisplayData implements Serializable {
      *
      * <p>Specifying a null value will clear the link url if it was previously defined.
      *
-     * <p>This method does not alter the current instance, but instead returns a new {@link Item}
-     * with the link url set.
+     * <p>This method does not alter the current instance, but instead returns a new
+     * {@link ItemSpec} with the link url set.
      */
-    public Item<T> withLinkUrl(String url) {
-      return of(getNamespace(), getKey(), getType(), getValue(), getShortValue(), getLabel(), url);
+    public ItemSpec<T> withLinkUrl(@Nullable String url) {
+      return toBuilder()
+          .setLinkUrl(url)
+          .build();
     }
 
     /**
@@ -339,84 +435,166 @@ public class DisplayData implements Serializable {
      * <p>This should only be used internally. It is useful to compare the value of a
      * {@link DisplayData.Item} to the value derived from a specified input.
      */
-    private Item<T> withValue(Object value) {
-      FormattedItemValue formatted = getType().safeFormat(value);
-      return of(getNamespace(), getKey(), getType(), formatted.getLongValue(),
-          formatted.getShortValue(), getLabel(), getLinkUrl());
-    }
-
-    private static <T> Item<T> of(
-        @Nullable String namespace,
-        String key,
-        Type type,
-        @Nullable Object value,
-        @Nullable Object shortValue,
-        @Nullable String label,
-        @Nullable String linkUrl) {
-      return new AutoValue_DisplayData_Item<>(
-          namespace, key, type, value, shortValue, label, linkUrl);
+    private ItemSpec<T> withValue(T value) {
+      return toBuilder()
+          .setRawValue(value)
+          .build();
     }
 
     @Override
     public String toString() {
       return String.format("%s:%s=%s", getNamespace(), getKey(), getValue());
     }
+
+    static <T> ItemSpec.Builder<T> builder() {
+      return new AutoValue_DisplayData_ItemSpec.Builder<>();
+    }
+
+    abstract ItemSpec.Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      public abstract ItemSpec.Builder<T> setKey(String key);
+      public abstract ItemSpec.Builder<T> setNamespace(@Nullable Class<?> namespace);
+      public abstract ItemSpec.Builder<T> setType(Type type);
+      public abstract ItemSpec.Builder<T> setValue(@Nullable Object longValue);
+      public abstract ItemSpec.Builder<T> setShortValue(@Nullable Object shortValue);
+      public abstract ItemSpec.Builder<T> setLabel(@Nullable String label);
+      public abstract ItemSpec.Builder<T> setLinkUrl(@Nullable String url);
+      public abstract ItemSpec<T> build();
+
+
+      abstract Type getType();
+
+      ItemSpec.Builder<T> setRawValue(@Nullable T value) {
+        FormattedItemValue formatted = getType().safeFormat(value);
+        return this
+            .setValue(formatted.getLongValue())
+            .setShortValue(formatted.getShortValue());
+      }
+    }
   }
 
   /**
    * Unique identifier for a display data item within a component.
-   * Identifiers are composed of the key they are registered with and a namespace generated from
-   * the class of the component which registered the item.
+   *
+   * <p>Identifiers are composed of:
+   *
+   * <ul>
+   *   <li>A {@link #getPath() path} based on the component hierarchy</li>
+   *   <li>The {@link #getKey() key} it is registered with</li>
+   *   <li>A {@link #getNamespace() namespace} generated from the class of the component which
+   *   registered the item.</li>
+   * </ul>
    *
    * <p>Display data registered with the same key from different components will have different
    * namespaces and thus will both be represented in the composed {@link DisplayData}. If a
    * single component registers multiple metadata items with the same key, only the most recent
    * item will be retained; previous versions are discarded.
    */
-  public static class Identifier {
-    private final String ns;
-    private final String key;
+  @AutoValue
+  public abstract static class Identifier {
+    public abstract Path getPath();
+    public abstract Class<?> getNamespace();
+    public abstract String getKey();
 
-    public static Identifier of(Class<?> namespace, String key) {
-      return of(namespaceOf(namespace), key);
+    public static Identifier of(Path path, Class<?> namespace, String key) {
+      return new AutoValue_DisplayData_Identifier(path, namespace, key);
     }
 
-    public static Identifier of(String namespace, String key) {
-      return new Identifier(namespace, key);
+    @Override
+    public String toString() {
+      return String.format("%s%s:%s", getPath(), getNamespace(), getKey());
     }
+  }
 
-    private Identifier(String ns, String key) {
-      this.ns = ns;
-      this.key = key;
+  /**
+   * Structured path of registered display data within a component hierarchy.
+   *
+   * <p>Display data items registered directly by a component will have the {@link Path#root() root}
+   * path. If the component {@link Builder#include includes} a sub-component, its display data will
+   * be registered at the path specified. Each sub-component path is created by appending a child
+   * element to the path of its parent component, forming a hierarchy.
+   */
+  public static class Path {
+    private final ImmutableList<String> components;
+    private Path(ImmutableList<String> components) {
+      this.components = components;
     }
 
-    public String getNamespace() {
-      return ns;
+    /**
+     * Path for display data registered by a top-level component.
+     */
+    public static Path root() {
+      return new Path(ImmutableList.<String>of());
     }
 
-    public String getKey() {
-      return key;
+    /**
+     * Construct a path from an absolute component path hierarchy.
+     *
+     * <p>For the root path, use {@link Path#root()}.
+     *
+     * @param firstPath Path of the first sub-component.
+     * @param paths Additional path components.
+     */
+    public static Path absolute(String firstPath, String... paths) {
+      ImmutableList.Builder<String> builder = ImmutableList.builder();
+
+      validatePathElement(firstPath);
+      builder.add(firstPath);
+      for (String path : paths) {
+        validatePathElement(path);
+        builder.add(path);
+      }
+
+      return new Path(builder.build());
     }
 
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof Identifier) {
-        Identifier that = (Identifier) obj;
-        return Objects.equals(this.ns, that.ns)
-          && Objects.equals(this.key, that.key);
-      }
+    /**
+     * Hierarchy list of component paths making up the full path, starting with the top-level child
+     * component path. For the {@link #root root} path, returns the empty list.
+     */
+    public List<String> getComponents() {
+      return components;
+    }
 
-      return false;
+    /**
+     * Extend the path by appending a sub-component path. The new path element is added to the end
+     * of the path hierarchy.
+     *
+     * <p>Returns a new {@link Path} instance; the originating {@link Path} is not modified.
+     */
+    public Path extend(String path) {
+      validatePathElement(path);
+      return new Path(ImmutableList.<String>builder()
+          .addAll(components.iterator())
+          .add(path)
+          .build());
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(ns, key);
+    private static void validatePathElement(String path) {
+      checkNotNull(path);
+      checkArgument(!"".equals(path), "path cannot be empty");
     }
 
     @Override
     public String toString() {
-      return String.format("%s:%s", ns, key);
+      StringBuilder b = new StringBuilder().append("[");
+      Joiner.on("/").appendTo(b, components);
+      b.append("]");
+      return b.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof Path
+          && Objects.equals(components, ((Path) obj).components);
+
+    }
+
+    @Override
+    public int hashCode() {
+      return components.hashCode();
     }
   }
 
@@ -551,65 +729,79 @@ public class DisplayData implements Serializable {
     Object getLongValue() {
       return this.longValue;
     }
-
     Object getShortValue() {
       return this.shortValue;
     }
   }
 
   private static class InternalBuilder implements Builder {
-    private final Map<Identifier, Item<?>> entries;
-    private final Set<Object> visited;
+    private final Map<Identifier, Item> entries;
+    private final Set<HasDisplayData> visitedComponents;
+    private final Map<Path, HasDisplayData> visitedPathMap;
 
-    private String latestNs;
+    private Path latestPath;
+    private Class<?> latestNs;
 
     private InternalBuilder() {
       this.entries = Maps.newHashMap();
-      this.visited = Sets.newIdentityHashSet();
-    }
-
-    private static InternalBuilder forRoot(HasDisplayData instance) {
-      InternalBuilder builder = new InternalBuilder();
-      builder.include(instance);
-      return builder;
+      this.visitedComponents = Sets.newIdentityHashSet();
+      this.visitedPathMap = Maps.newHashMap();
     }
 
     @Override
-    public Builder include(HasDisplayData subComponent) {
+    public Builder include(String path, HasDisplayData subComponent) {
       checkNotNull(subComponent, "subComponent argument cannot be null");
-      return include(subComponent, subComponent.getClass());
+      checkNotNull(path, "path argument cannot be null");
+
+      Path absolutePath = latestPath.extend(path);
+
+      HasDisplayData existingComponent = visitedPathMap.get(absolutePath);
+      if (existingComponent != null) {
+        throw new IllegalArgumentException(String.format("Specified path '%s' already used for "
+                + "subcomponent %s. Subcomponents must be included using unique paths.",
+            path, existingComponent));
+      }
+
+      return include(absolutePath, subComponent);
     }
 
     @Override
-    public Builder include(HasDisplayData subComponent, Class<?> namespace) {
-      checkNotNull(namespace, "Input namespace override cannot be null");
-      return include(subComponent, namespaceOf(namespace));
+    public Builder delegate(HasDisplayData component) {
+      checkNotNull(component);
+
+      return include(latestPath, component);
     }
 
-    @Override
-    public Builder include(HasDisplayData subComponent, String namespace) {
-      checkNotNull(subComponent, "subComponent argument cannot be null");
-      checkNotNull(namespace, "Input namespace override cannot be null");
-
-      boolean newComponent = visited.add(subComponent);
-      if (newComponent) {
-        String prevNs = this.latestNs;
-        this.latestNs = namespace;
-
-        try {
-          subComponent.populateDisplayData(this);
-        } catch (PopulateDisplayDataException e) {
-          // Don't re-wrap exceptions recursively.
-          throw e;
-        } catch (Throwable e) {
-          String msg = String.format("Error while populating display data for component: %s",
-              namespace);
-          throw new PopulateDisplayDataException(msg, e);
-        }
+    private Builder include(Path path, HasDisplayData subComponent) {
+      if (visitedComponents.contains(subComponent)) {
+        // Component previously registered; ignore in order to break cyclic dependencies
+        return this;
+      }
 
-        this.latestNs = prevNs;
+      // New component; add it.
+      visitedComponents.add(subComponent);
+      visitedPathMap.put(path, subComponent);
+      Class<?> namespace = subComponent.getClass();
+
+      Path prevPath = latestPath;
+      Class<?> prevNs = latestNs;
+      latestPath = path;
+      latestNs = namespace;
+
+      try {
+        subComponent.populateDisplayData(this);
+      } catch (PopulateDisplayDataException e) {
+        // Don't re-wrap exceptions recursively.
+        throw e;
+      } catch (Throwable e) {
+        String msg = String.format("Error while populating display data for component: %s",
+            namespace.getName());
+        throw new PopulateDisplayDataException(msg, e);
       }
 
+      latestPath = prevPath;
+      latestNs = prevNs;
+
       return this;
     }
 
@@ -623,39 +815,41 @@ public class DisplayData implements Serializable {
     }
 
     @Override
-    public Builder add(Item<?> item) {
+    public Builder add(ItemSpec<?> item) {
       checkNotNull(item, "Input display item cannot be null");
       return addItemIf(true, item);
     }
 
     @Override
-    public Builder addIfNotNull(Item<?> item) {
+    public Builder addIfNotNull(ItemSpec<?> item) {
       checkNotNull(item, "Input display item cannot be null");
       return addItemIf(item.getValue() != null, item);
     }
 
     @Override
-    public <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue) {
+    public <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue) {
       checkNotNull(item, "Input display item cannot be null");
-      Item<T> defaultItem = item.withValue(defaultValue);
+      ItemSpec<T> defaultItem = item.withValue(defaultValue);
       return addItemIf(!Objects.equals(item, defaultItem), item);
     }
 
-    private Builder addItemIf(boolean condition, Item<?> item) {
+    private Builder addItemIf(boolean condition, ItemSpec<?> spec) {
       if (!condition) {
         return this;
       }
 
-      checkNotNull(item, "Input display item cannot be null");
-      checkNotNull(item.getValue(), "Input display value cannot be null");
-      if (item.getNamespace() == null) {
-        item = item.withNamespace(latestNs);
+      checkNotNull(spec, "Input display item cannot be null");
+      checkNotNull(spec.getValue(), "Input display value cannot be null");
+
+      if (spec.getNamespace() == null) {
+        spec = spec.withNamespace(latestNs);
       }
+      Item item = Item.create(spec, latestPath);
 
-      Identifier id = Identifier.of(item.getNamespace(), item.getKey());
+      Identifier id = Identifier.of(item.getPath(), item.getNamespace(), item.getKey());
       checkArgument(!entries.containsKey(id),
-          "Display data key (%s) is not unique within the specified namespace (%s).",
-          item.getKey(), item.getNamespace());
+          "Display data key (%s) is not unique within the specified path and namespace: %s%s.",
+          item.getKey(), item.getPath(), item.getNamespace());
 
       entries.put(id, item);
       return this;
@@ -669,63 +863,63 @@ public class DisplayData implements Serializable {
   /**
    * Create a display item for the specified key and string value.
    */
-  public static Item<String> item(String key, @Nullable String value) {
+  public static ItemSpec<String> item(String key, @Nullable String value) {
     return item(key, Type.STRING, value);
   }
 
   /**
    * Create a display item for the specified key and integer value.
    */
-  public static Item<Integer> item(String key, @Nullable Integer value) {
+  public static ItemSpec<Integer> item(String key, @Nullable Integer value) {
     return item(key, Type.INTEGER, value);
   }
 
   /**
    * Create a display item for the specified key and integer value.
    */
-  public static Item<Long> item(String key, @Nullable Long value) {
+  public static ItemSpec<Long> item(String key, @Nullable Long value) {
     return item(key, Type.INTEGER, value);
   }
 
   /**
    * Create a display item for the specified key and floating point value.
    */
-  public static Item<Float> item(String key, @Nullable Float value) {
+  public static ItemSpec<Float> item(String key, @Nullable Float value) {
     return item(key, Type.FLOAT, value);
   }
 
   /**
    * Create a display item for the specified key and floating point value.
    */
-  public static Item<Double> item(String key, @Nullable Double value) {
+  public static ItemSpec<Double> item(String key, @Nullable Double value) {
     return item(key, Type.FLOAT, value);
   }
 
   /**
    * Create a display item for the specified key and boolean value.
    */
-  public static Item<Boolean> item(String key, @Nullable Boolean value) {
+  public static ItemSpec<Boolean> item(String key, @Nullable Boolean value) {
     return item(key, Type.BOOLEAN, value);
   }
 
   /**
    * Create a display item for the specified key and timestamp value.
    */
-  public static Item<Instant> item(String key, @Nullable Instant value) {
+  public static ItemSpec<Instant> item(String key, @Nullable Instant value) {
     return item(key, Type.TIMESTAMP, value);
   }
 
   /**
    * Create a display item for the specified key and duration value.
    */
-  public static Item<Duration> item(String key, @Nullable Duration value) {
+  public static ItemSpec<Duration> item(String key, @Nullable Duration value) {
     return item(key, Type.DURATION, value);
   }
 
   /**
    * Create a display item for the specified key and class value.
    */
-  public static <T> Item<Class<T>> item(String key, @Nullable Class<T> value) {
+  public static <T> ItemSpec<Class<T>> item(String key, @Nullable Class<T> value) {
     return item(key, Type.JAVA_CLASS, value);
   }
 
@@ -739,10 +933,10 @@ public class DisplayData implements Serializable {
    *
    *  @see Type#inferType(Object)
    */
-  public static <T> Item<T> item(String key, Type type, @Nullable T value) {
+  public static <T> ItemSpec<T> item(String key, Type type, @Nullable T value) {
     checkNotNull(key, "key argument cannot be null");
     checkNotNull(type, "type argument cannot be null");
 
-    return Item.create(key, type, value);
+    return ItemSpec.create(key, type, value);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 57f7716..684a776 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -578,7 +578,7 @@ public class Window {
         builder
             .add(DisplayData.item("windowFn", windowFn.getClass())
               .withLabel("Windowing Function"))
-            .include(windowFn);
+            .include("windowFn", windowFn);
       }
 
       if (allowedLateness != null) {



Mime
View raw message