beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] incubator-beam git commit: Publish DisplayData for PipelineOptions.
Date Fri, 22 Apr 2016 22:42:19 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 692f3a136 -> f3e6c5351


Publish DisplayData for PipelineOptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c16bdd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c16bdd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c16bdd6

Branch: refs/heads/master
Commit: 2c16bdd68c1196c4a394c240f232150476f70b0e
Parents: 692f3a1
Author: Scott Wegner <swegner@google.com>
Authored: Tue Apr 5 11:49:26 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Fri Apr 22 15:41:34 2016 -0700

----------------------------------------------------------------------
 .../runners/DataflowPipelineTranslatorTest.java |   5 +-
 .../beam/sdk/options/PipelineOptionSpec.java    |  89 ++++++
 .../beam/sdk/options/PipelineOptions.java       |   4 +-
 .../sdk/options/PipelineOptionsFactory.java     | 128 +++++----
 .../sdk/options/PipelineOptionsReflector.java   | 112 ++++++++
 .../sdk/options/ProxyInvocationHandler.java     | 224 ++++++++++++++--
 .../sdk/transforms/display/DisplayData.java     |  26 +-
 .../options/PipelineOptionsReflectorTest.java   | 196 ++++++++++++++
 .../sdk/options/ProxyInvocationHandlerTest.java | 268 ++++++++++++++++++-
 .../sdk/transforms/display/DisplayDataTest.java |  25 ++
 10 files changed, 979 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
index a62f550..27c0acc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -204,8 +204,9 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     settings.put("numberOfWorkerHarnessThreads", 0);
     settings.put("experiments", null);
 
-    assertEquals(ImmutableMap.of("options", settings),
-        job.getEnvironment().getSdkPipelineOptions());
+    Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
+    assertThat(sdkPipelineOptions, hasKey("options"));
+    assertEquals(settings, sdkPipelineOptions.get("options"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java
new file mode 100644
index 0000000..71f9d46
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+
+import java.lang.reflect.Method;
+
+/**
+ * For internal use. Specification for an option defined in a {@link PipelineOptions} interface.
+ */
+class PipelineOptionSpec {
+  private final Class<? extends PipelineOptions> clazz;
+  private final String name;
+  private final Method getter;
+
+  static PipelineOptionSpec of(Class<? extends PipelineOptions> clazz, String name, Method getter) {
+    return new PipelineOptionSpec(clazz, name, getter);
+  }
+
+  private PipelineOptionSpec(Class<? extends PipelineOptions> clazz, String name, Method getter) {
+    this.clazz = clazz;
+    this.name = name;
+    this.getter = getter;
+  }
+
+  /**
+   * The {@link PipelineOptions} interface which defines this {@link PipelineOptionSpec}.
+   */
+  Class<? extends PipelineOptions> getDefiningInterface() {
+    return clazz;
+  }
+
+  /**
+   * Name of the property.
+   */
+  String getName() {
+    return name;
+  }
+
+  /**
+   * The getter method for this property.
+   */
+  Method getGetterMethod() {
+    return getter;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("definingInterface", getDefiningInterface())
+        .add("name", getName())
+        .add("getterMethod", getGetterMethod())
+        .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(getDefiningInterface(), getName(), getGetterMethod());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof PipelineOptionSpec)) {
+      return false;
+    }
+
+    PipelineOptionSpec that = (PipelineOptionSpec) obj;
+    return Objects.equal(this.getDefiningInterface(), that.getDefiningInterface())
+        && Objects.equal(this.getName(), that.getName())
+        && Objects.equal(this.getGetterMethod(), that.getGetterMethod());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 17cf5b3..a2f38ed 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
-
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import com.google.auto.service.AutoService;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -194,7 +194,7 @@ import javax.annotation.concurrent.ThreadSafe;
 @JsonSerialize(using = Serializer.class)
 @JsonDeserialize(using = Deserializer.class)
 @ThreadSafe
-public interface PipelineOptions {
+public interface PipelineOptions extends HasDisplayData {
   /**
    * Transforms this object into an object of type {@code <T>} saving each property
    * that has been manipulated. {@code <T>} must extend {@link PipelineOptions}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 87ac05e..5fc7312 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -22,9 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument;
 import org.apache.beam.sdk.options.Validation.Required;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.StringUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -32,7 +33,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableListMultimap;
@@ -43,8 +43,11 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.RowSortedTable;
 import com.google.common.collect.Sets;
 import com.google.common.collect.SortedSetMultimap;
+import com.google.common.collect.TreeBasedTable;
 import com.google.common.collect.TreeMultimap;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -77,6 +80,7 @@ import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 import javax.annotation.Nullable;
@@ -444,6 +448,7 @@ public class PipelineOptionsFactory {
   @SuppressWarnings("rawtypes")
   private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0];
   private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final ClassLoader CLASS_LOADER;
   private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS;
 
   /** Classes that are used as the boundary in the stack trace to find the callers class name. */
@@ -510,7 +515,7 @@ public class PipelineOptionsFactory {
       throw new ExceptionInInitializerError(e);
     }
 
-    ClassLoader classLoader = findClassLoader();
+    CLASS_LOADER = findClassLoader();
 
     // Store the list of all available pipeline runners.
     ImmutableMap.Builder<String, Class<? extends PipelineRunner<?>>> builder =
@@ -518,25 +523,14 @@ public class PipelineOptionsFactory {
     Set<PipelineRunnerRegistrar> pipelineRunnerRegistrars =
         Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
     pipelineRunnerRegistrars.addAll(
-        Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, classLoader)));
+        Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class, CLASS_LOADER)));
     for (PipelineRunnerRegistrar registrar : pipelineRunnerRegistrars) {
       for (Class<? extends PipelineRunner<?>> klass : registrar.getPipelineRunners()) {
         builder.put(klass.getSimpleName(), klass);
       }
     }
     SUPPORTED_PIPELINE_RUNNERS = builder.build();
-
-    // Load and register the list of all classes that extend PipelineOptions.
-    register(PipelineOptions.class);
-    Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
-        Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
-    pipelineOptionsRegistrars.addAll(
-        Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, classLoader)));
-    for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
-      for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
-        register(klass);
-      }
-    }
+    initializeRegistry();
   }
 
   /**
@@ -566,6 +560,33 @@ public class PipelineOptionsFactory {
   }
 
   /**
+   * Resets the set of interfaces registered with this factory to the default state.
+   *
+   * @see PipelineOptionsFactory#register(Class)
+   */
+  @VisibleForTesting
+  static synchronized void resetRegistry() {
+    REGISTERED_OPTIONS.clear();
+    initializeRegistry();
+  }
+
+  /**
+   *  Load and register the list of all classes that extend PipelineOptions.
+   */
+  private static void initializeRegistry() {
+    register(PipelineOptions.class);
+    Set<PipelineOptionsRegistrar> pipelineOptionsRegistrars =
+        Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+    pipelineOptionsRegistrars.addAll(
+        Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class, CLASS_LOADER)));
+    for (PipelineOptionsRegistrar registrar : pipelineOptionsRegistrars) {
+      for (Class<? extends PipelineOptions> klass : registrar.getPipelineOptions()) {
+        register(klass);
+      }
+    }
+  }
+
+  /**
    * Validates that the interface conforms to the following:
    * <ul>
    *   <li>Any property with the same name must have the same return type for all derived
@@ -674,32 +695,20 @@ public class PipelineOptionsFactory {
     Preconditions.checkNotNull(iface);
     validateWellFormed(iface, REGISTERED_OPTIONS);
 
-    Iterable<Method> methods =
-        Iterables.filter(
-            ReflectHelpers.getClosureOfMethodsOnInterface(iface), NOT_SYNTHETIC_PREDICATE);
-    ListMultimap<Class<?>, Method> ifaceToMethods = ArrayListMultimap.create();
-    for (Method method : methods) {
-      // Process only methods that are not marked as hidden.
-      if (method.getAnnotation(Hidden.class) == null) {
-        ifaceToMethods.put(method.getDeclaringClass(), method);
-      }
+    Set<PipelineOptionSpec> properties =
+        PipelineOptionsReflector.getOptionSpecs(iface);
+
+    RowSortedTable<Class<?>, String, Method> ifacePropGetterTable = TreeBasedTable.create(
+        ClassNameComparator.INSTANCE, Ordering.natural());
+    for (PipelineOptionSpec prop : properties) {
+      ifacePropGetterTable.put(prop.getDefiningInterface(), prop.getName(), prop.getGetterMethod());
     }
-    SortedSet<Class<?>> ifaces = new TreeSet<>(ClassNameComparator.INSTANCE);
-    // Keep interfaces that are not marked as hidden.
-    ifaces.addAll(Collections2.filter(ifaceToMethods.keySet(), new Predicate<Class<?>>() {
-      @Override
-      public boolean apply(Class<?> input) {
-        return input.getAnnotation(Hidden.class) == null;
-      }
-    }));
-    for (Class<?> currentIface : ifaces) {
-      Map<String, Method> propertyNamesToGetters =
-          getPropertyNamesToGetters(ifaceToMethods.get(currentIface));
 
-      // Don't output anything if there are no defined options
-      if (propertyNamesToGetters.isEmpty()) {
-        continue;
-      }
+    for (Map.Entry<Class<?>, Map<String, Method>> ifaceToPropertyMap :
+        ifacePropGetterTable.rowMap().entrySet()) {
+      Class<?> currentIface = ifaceToPropertyMap.getKey();
+      Map<String, Method> propertyNamesToGetters = ifaceToPropertyMap.getValue();
+
       SortedSetMultimap<String, String> requiredGroupNameToProperties =
           getRequiredGroupNamesToProperties(propertyNamesToGetters);
 
@@ -838,15 +847,21 @@ public class PipelineOptionsFactory {
    * <p>TODO: Swap back to using Introspector once the proxy class issue with AppEngine is
    * resolved.
    */
-  private static List<PropertyDescriptor> getPropertyDescriptors(Class<?> beanClass)
+  private static List<PropertyDescriptor> getPropertyDescriptors(
+      Class<? extends PipelineOptions> beanClass)
       throws IntrospectionException {
     // The sorting is important to make this method stable.
     SortedSet<Method> methods = Sets.newTreeSet(MethodComparator.INSTANCE);
     methods.addAll(
         Collections2.filter(Arrays.asList(beanClass.getMethods()), NOT_SYNTHETIC_PREDICATE));
-    SortedMap<String, Method> propertyNamesToGetters = getPropertyNamesToGetters(methods);
-    List<PropertyDescriptor> descriptors = Lists.newArrayList();
 
+    SortedMap<String, Method> propertyNamesToGetters = new TreeMap<>();
+    for (Map.Entry<String, Method> entry :
+        PipelineOptionsReflector.getPropertyNamesToGetters(methods).entries()) {
+      propertyNamesToGetters.put(entry.getKey(), entry.getValue());
+    }
+
+    List<PropertyDescriptor> descriptors = Lists.newArrayList();
     List<TypeMismatch> mismatches = new ArrayList<>();
     /*
      * Add all the getter/setter pairs to the list of descriptors removing the getter once
@@ -919,28 +934,6 @@ public class PipelineOptionsFactory {
   }
 
   /**
-   * Returns a map of the property name to the getter method it represents.
-   * If there are duplicate methods with the same bean name, then it is indeterminate
-   * as to which method will be returned.
-   */
-  private static SortedMap<String, Method> getPropertyNamesToGetters(Iterable<Method> methods) {
-    SortedMap<String, Method> propertyNamesToGetters = Maps.newTreeMap();
-    for (Method method : methods) {
-      String methodName = method.getName();
-      if ((!methodName.startsWith("get")
-          && !methodName.startsWith("is"))
-          || method.getParameterTypes().length != 0
-          || method.getReturnType() == void.class) {
-        continue;
-      }
-      String propertyName = Introspector.decapitalize(
-          methodName.startsWith("is") ? methodName.substring(2) : methodName.substring(3));
-      propertyNamesToGetters.put(propertyName, method);
-    }
-    return propertyNamesToGetters;
-  }
-
-  /**
    * Returns a map of required groups of arguments to the properties that satisfy the requirement.
    */
   private static SortedSetMultimap<String, String> getRequiredGroupNamesToProperties(
@@ -981,21 +974,22 @@ public class PipelineOptionsFactory {
    */
   private static List<PropertyDescriptor> validateClass(Class<? extends PipelineOptions> iface,
       Set<Class<? extends PipelineOptions>> validatedPipelineOptionsInterfaces,
-      Class<?> klass) throws IntrospectionException {
+      Class<? extends PipelineOptions> klass) throws IntrospectionException {
     Set<Method> methods = Sets.newHashSet(IGNORED_METHODS);
-    // Ignore static methods, "equals", "hashCode", "toString" and "as" on the generated class.
     // Ignore synthetic methods
     for (Method method : klass.getMethods()) {
       if (Modifier.isStatic(method.getModifiers()) || method.isSynthetic()) {
         methods.add(method);
       }
     }
+    // Ignore standard infrastructure methods on the generated class.
     try {
       methods.add(klass.getMethod("equals", Object.class));
       methods.add(klass.getMethod("hashCode"));
       methods.add(klass.getMethod("toString"));
       methods.add(klass.getMethod("as", Class.class));
       methods.add(klass.getMethod("cloneAs", Class.class));
+      methods.add(klass.getMethod("populateDisplayData", DisplayData.Builder.class));
     } catch (NoSuchMethodException | SecurityException e) {
       throw Throwables.propagate(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
new file mode 100644
index 0000000..815de82
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+import java.beans.Introspector;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utilities to reflect over {@link PipelineOptions}.
+ */
+class PipelineOptionsReflector {
+  private PipelineOptionsReflector() {}
+
+  /**
+   * Retrieve metadata for the full set of pipeline options visible within the type hierarchy
+   * of a single {@link PipelineOptions} interface.
+   *
+   * @see PipelineOptionsReflector#getOptionSpecs(Iterable)
+   */
+  static Set<PipelineOptionSpec> getOptionSpecs(Class<? extends PipelineOptions> optionsInterface) {
+    Iterable<Method> methods = ReflectHelpers.getClosureOfMethodsOnInterface(optionsInterface);
+    Multimap<String, Method> propsToGetters = getPropertyNamesToGetters(methods);
+
+    ImmutableSet.Builder<PipelineOptionSpec> setBuilder = ImmutableSet.builder();
+    for (Map.Entry<String, Method> propAndGetter : propsToGetters.entries()) {
+      String prop = propAndGetter.getKey();
+      Method getter = propAndGetter.getValue();
+
+      @SuppressWarnings("unchecked")
+      Class<? extends PipelineOptions> declaringClass =
+          (Class<? extends PipelineOptions>) getter.getDeclaringClass();
+
+      if (!PipelineOptions.class.isAssignableFrom(declaringClass)) {
+        continue;
+      }
+
+      if (declaringClass.isAnnotationPresent(Hidden.class)) {
+        continue;
+      }
+
+      setBuilder.add(PipelineOptionSpec.of(declaringClass, prop, getter));
+    }
+
+    return setBuilder.build();
+  }
+
+  /**
+   * Retrieve metadata for the full set of pipeline options visible within the type hierarchy
+   * closure of the set of input interfaces. An option is "visible" if:
+   *
+   * <ul>
+   *   <li>The option is defined within the interface hierarchy closure of the input
+   *   {@link PipelineOptions}.</li>
+   *   <li>The defining interface is not marked {@link Hidden}.</li>
+   * </ul>
+   */
+  static Set<PipelineOptionSpec> getOptionSpecs(
+      Iterable<Class<? extends PipelineOptions>> optionsInterfaces) {
+    ImmutableSet.Builder<PipelineOptionSpec> setBuilder = ImmutableSet.builder();
+    for (Class<? extends PipelineOptions> optionsInterface : optionsInterfaces) {
+      setBuilder.addAll(getOptionSpecs(optionsInterface));
+    }
+
+    return setBuilder.build();
+  }
+
+  /**
+   * Extract pipeline options and their respective getter methods from a series of
+   * {@link Method methods}. A single pipeline option may appear in many methods.
+   *
+   * @return A mapping of option name to the input methods which declare it.
+   */
+  static Multimap<String, Method> getPropertyNamesToGetters(Iterable<Method> methods) {
+    Multimap<String, Method> propertyNamesToGetters = HashMultimap.create();
+    for (Method method : methods) {
+      String methodName = method.getName();
+      if ((!methodName.startsWith("get")
+          && !methodName.startsWith("is"))
+          || method.getParameterTypes().length != 0
+          || method.getReturnType() == void.class) {
+        continue;
+      }
+      String propertyName = Introspector.decapitalize(
+          methodName.startsWith("is") ? methodName.substring(2) : methodName.substring(3));
+      propertyNamesToGetters.put(propertyName, method);
+    }
+    return propertyNamesToGetters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index e281625..a269f4e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -19,16 +19,22 @@ package org.apache.beam.sdk.options;
 
 import org.apache.beam.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate;
 import org.apache.beam.sdk.options.PipelineOptionsFactory.Registration;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Defaults;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ClassToInstanceMap;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.MutableClassToInstanceMap;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -52,6 +58,8 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Type;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -59,7 +67,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 /**
@@ -85,16 +93,26 @@ class ProxyInvocationHandler implements InvocationHandler {
   private final int hashCode = (int) (Math.random() * Integer.MAX_VALUE);
   private final Set<Class<? extends PipelineOptions>> knownInterfaces;
   private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
-  private final Map<String, Object> options;
+  private final Map<String, BoundValue> options;
   private final Map<String, JsonNode> jsonOptions;
   private final Map<String, String> gettersToPropertyNames;
   private final Map<String, String> settersToPropertyNames;
 
   ProxyInvocationHandler(Map<String, Object> options) {
-    this(options, Maps.<String, JsonNode>newHashMap());
+    this(bindOptions(options), Maps.<String, JsonNode>newHashMap());
+  }
+
+  private static Map<String, BoundValue> bindOptions(Map<String, Object> inputOptions) {
+    HashMap<String, BoundValue> options = Maps.newHashMap();
+    for (Map.Entry<String, Object> entry : inputOptions.entrySet()) {
+      options.put(entry.getKey(), BoundValue.fromExplicitOption(entry.getValue()));
+    }
+
+    return options;
   }
 
-  private ProxyInvocationHandler(Map<String, Object> options, Map<String, JsonNode> jsonOptions) {
+  private ProxyInvocationHandler(
+      Map<String, BoundValue> options, Map<String, JsonNode> jsonOptions) {
     this.options = options;
     this.jsonOptions = jsonOptions;
     this.knownInterfaces = new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
@@ -119,21 +137,27 @@ class ProxyInvocationHandler implements InvocationHandler {
       @SuppressWarnings("unchecked")
       Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>) args[0];
       return cloneAs(proxy, clazz);
+    } else if (args != null && "populateDisplayData".equals(method.getName())
+        && args[0] instanceof DisplayData.Builder) {
+      @SuppressWarnings("unchecked")
+      DisplayData.Builder builder = (DisplayData.Builder) args[0];
+      populateDisplayData(builder);
+      return Void.TYPE;
     }
     String methodName = method.getName();
     synchronized (this) {
-      if (gettersToPropertyNames.keySet().contains(methodName)) {
+      if (gettersToPropertyNames.containsKey(methodName)) {
         String propertyName = gettersToPropertyNames.get(methodName);
         if (!options.containsKey(propertyName)) {
           // Lazy bind the default to the method.
           Object value = jsonOptions.containsKey(propertyName)
               ? getValueFromJson(propertyName, method)
               : getDefault((PipelineOptions) proxy, method);
-          options.put(propertyName, value);
+          options.put(propertyName, BoundValue.fromDefault(value));
         }
-        return options.get(propertyName);
+        return options.get(propertyName).getValue();
       } else if (settersToPropertyNames.containsKey(methodName)) {
-        options.put(settersToPropertyNames.get(methodName), args[0]);
+        options.put(settersToPropertyNames.get(methodName), BoundValue.fromExplicitOption(args[0]));
         return Void.TYPE;
       }
     }
@@ -142,6 +166,35 @@ class ProxyInvocationHandler implements InvocationHandler {
   }
 
   /**
+   * Track whether options values are explicitly set, or retrieved from defaults.
+   */
+  @AutoValue
+  abstract static class BoundValue {
+    @Nullable
+    abstract Object getValue();
+
+    abstract boolean isDefault();
+
+    private static BoundValue of(@Nullable Object value, boolean isDefault) {
+      return new AutoValue_ProxyInvocationHandler_BoundValue(value, isDefault);
+    }
+
+    /**
+     * Create a {@link BoundValue} representing an explicitly set option.
+     */
+    static BoundValue fromExplicitOption(@Nullable Object value) {
+      return BoundValue.of(value, false);
+    }
+
+    /**
+     * Create a {@link BoundValue} representing a default option value.
+     */
+    static BoundValue fromDefault(@Nullable Object value) {
+      return BoundValue.of(value, true);
+    }
+  }
+
+  /**
    * Backing implementation for {@link PipelineOptions#as(Class)}.
    *
    * @param iface The interface that the returned object needs to implement.
@@ -210,6 +263,128 @@ class ProxyInvocationHandler implements InvocationHandler {
   }
 
   /**
+   * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
+   * pipeline options will be added as display data.
+   */
+  private void populateDisplayData(DisplayData.Builder builder) {
+    Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
+    Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
+
+    for (Map.Entry<String, BoundValue> option : options.entrySet()) {
+      BoundValue boundValue = option.getValue();
+      if (boundValue.isDefault()) {
+        continue;
+      }
+
+      Object value = boundValue.getValue() == null ? "" : boundValue.getValue();
+      DisplayData.Type type = DisplayData.inferType(value);
+      HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+
+      for (PipelineOptionSpec optionSpec : specs) {
+        Class<?> pipelineInterface = optionSpec.getDefiningInterface();
+        if (type != null) {
+          builder.add(option.getKey(), type, value)
+              .withNamespace(pipelineInterface);
+        } else {
+          builder.add(option.getKey(), value.toString())
+              .withNamespace(pipelineInterface);
+        }
+      }
+    }
+
+    for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
+      if (options.containsKey(jsonOption.getKey())) {
+        // Option overwritten since deserialization; don't re-write
+        continue;
+      }
+
+      HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
+      if (specs.isEmpty()) {
+        builder.add(jsonOption.getKey(), jsonOption.getValue().toString())
+          .withNamespace(UnknownPipelineOptions.class);
+      } else {
+        for (PipelineOptionSpec spec : specs) {
+          Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod());
+          DisplayData.Type type = DisplayData.inferType(value);
+          if (type != null) {
+            builder.add(jsonOption.getKey(), type, value)
+                .withNamespace(spec.getDefiningInterface());
+          } else {
+            builder.add(jsonOption.getKey(), value.toString())
+                .withNamespace(spec.getDefiningInterface());
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Marker interface used when the original {@link PipelineOptions} interface is not known at
+   * runtime. This can occur if {@link PipelineOptions} are deserialized from JSON.
+   *
+   * <p>Pipeline authors can ensure {@link PipelineOptions} type information is available at
+   * runtime by registering their {@link PipelineOptions options} interfaces. See the "Registration"
+   * section of {@link PipelineOptions} documentation.
+   */
+  interface UnknownPipelineOptions extends PipelineOptions {}
+
+  /**
+   * Construct a mapping from an option name to its {@link PipelineOptions} interface(s)
+   * declarations. An option may be declared in multiple interfaces. If it is overridden in a
+   * type hierarchy, only the overriding interface will be included.
+   */
+  private Multimap<String, PipelineOptionSpec> buildOptionNameToSpecMap(
+      Set<PipelineOptionSpec> props) {
+
+    Multimap<String, PipelineOptionSpec> optionsMap = HashMultimap.create();
+    for (PipelineOptionSpec prop : props) {
+      optionsMap.put(prop.getName(), prop);
+    }
+
+    // Filter out overridden options
+    for (Map.Entry<String, Collection<PipelineOptionSpec>> entry : optionsMap.asMap().entrySet()) {
+
+      /* Compare all interfaces for an option pairwise (iface1, iface2) to look for type
+       hierarchies. If one is the base-class of the other, remove it from the output and continue
+       iterating.
+
+       This is an N^2 operation per-option, but the number of interfaces defining an option
+       should always be small (usually 1). */
+      List<PipelineOptionSpec> specs = Lists.newArrayList(entry.getValue());
+      if (specs.size() < 2) {
+        // Only one known implementing interface, no need to check for inheritance
+        continue;
+      }
+
+      for (int i = 0; i < specs.size() - 1; i++) {
+        Class<?> iface1 = specs.get(i).getDefiningInterface();
+        for (int j = i + 1; j < specs.size(); j++) {
+          Class<?> iface2 = specs.get(j).getDefiningInterface();
+
+          if (iface1.isAssignableFrom(iface2)) {
+            optionsMap.remove(entry.getKey(), specs.get(i));
+            specs.remove(i);
+
+            // Removed element at current "i" index. Set iterators to re-evaluate
+            // new "i" element in outer loop.
+            i--;
+            j = specs.size();
+          } else  if (iface2.isAssignableFrom(iface1)) {
+            optionsMap.remove(entry.getKey(), specs.get(j));
+            specs.remove(j);
+
+            // Removed element at current "j" index. Set iterator to re-evaluate
+            // new "j" element in inner-loop.
+            j--;
+          }
+        }
+      }
+    }
+
+    return optionsMap;
+  }
+
+  /**
    * This will output all the currently set values. This is a relatively costly function
    * as it will call {@code toString()} on each object that has been set and format
    * the results in a readable format.
@@ -222,7 +397,9 @@ class ProxyInvocationHandler implements InvocationHandler {
     // Add the options that we received from deserialization
     sortedOptions.putAll(jsonOptions);
     // Override with any programmatically set options.
-    sortedOptions.putAll(options);
+    for (Map.Entry<String, BoundValue> entry : options.entrySet()) {
+      sortedOptions.put(entry.getKey(), entry.getValue().getValue());
+    }
 
     StringBuilder b = new StringBuilder();
     b.append("Current Settings:\n");
@@ -347,7 +524,7 @@ class ProxyInvocationHandler implements InvocationHandler {
         // We first filter out any properties that have been modified since
         // the last serialization of this PipelineOptions and then verify that
         // they are all serializable.
-        Map<String, Object> filteredOptions = Maps.newHashMap(handler.options);
+        Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
         removeIgnoredOptions(handler.knownInterfaces, filteredOptions);
         ensureSerializable(handler.knownInterfaces, filteredOptions);
 
@@ -356,10 +533,23 @@ class ProxyInvocationHandler implements InvocationHandler {
         // instances that have been modified since the previous serialization.
         Map<String, Object> serializableOptions =
             Maps.<String, Object>newHashMap(handler.jsonOptions);
-        serializableOptions.putAll(filteredOptions);
+        for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
+          serializableOptions.put(entry.getKey(), entry.getValue().getValue());
+        }
+
         jgen.writeStartObject();
         jgen.writeFieldName("options");
         jgen.writeObject(serializableOptions);
+
+        List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
+        for (DisplayData.Item item : DisplayData.from(value).items()) {
+          @SuppressWarnings("unchecked")
+          Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class);
+          serializedDisplayData.add(serializedItem);
+        }
+
+        jgen.writeFieldName("display_data");
+        jgen.writeObject(serializedDisplayData);
         jgen.writeEndObject();
       }
     }
@@ -369,7 +559,7 @@ class ProxyInvocationHandler implements InvocationHandler {
      * {@link JsonIgnore @JsonIgnore} from the passed in options using the passed in interfaces.
      */
     private void removeIgnoredOptions(
-        Set<Class<? extends PipelineOptions>> interfaces, Map<String, Object> options) {
+        Set<Class<? extends PipelineOptions>> interfaces, Map<String, ?> options) {
       // Find all the method names that are annotated with JSON ignore.
       Set<String> jsonIgnoreMethodNames = FluentIterable.from(
           ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces))
@@ -394,7 +584,7 @@ class ProxyInvocationHandler implements InvocationHandler {
      * and deserializable.
      */
     private void ensureSerializable(Set<Class<? extends PipelineOptions>> interfaces,
-        Map<String, Object> options) throws IOException {
+        Map<String, BoundValue> options) throws IOException {
       // Construct a map from property name to the return type of the getter.
       Map<String, Type> propertyToReturnType = Maps.newHashMap();
       for (PropertyDescriptor descriptor
@@ -406,16 +596,16 @@ class ProxyInvocationHandler implements InvocationHandler {
       }
 
       // Attempt to serialize and deserialize each property.
-      for (Map.Entry<String, Object> entry : options.entrySet()) {
+      for (Map.Entry<String, BoundValue> entry : options.entrySet()) {
         try {
-          String serializedValue = MAPPER.writeValueAsString(entry.getValue());
+          String serializedValue = MAPPER.writeValueAsString(entry.getValue().getValue());
           JavaType type = MAPPER.getTypeFactory()
               .constructType(propertyToReturnType.get(entry.getKey()));
           MAPPER.readValue(serializedValue, type);
         } catch (Exception e) {
           throw new IOException(String.format(
               "Failed to serialize and deserialize property '%s' with value '%s'",
-              entry.getKey(), entry.getValue()), e);
+              entry.getKey(), entry.getValue().getValue()), e);
         }
       }
     }
@@ -435,7 +625,7 @@ class ProxyInvocationHandler implements InvocationHandler {
         fields.put(field.getKey(), field.getValue());
       }
       PipelineOptions options =
-          new ProxyInvocationHandler(Maps.<String, Object>newHashMap(), fields)
+          new ProxyInvocationHandler(Maps.<String, BoundValue>newHashMap(), fields)
               .as(PipelineOptions.class);
       return options;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 6065dc4..a1037a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -112,6 +112,21 @@ public class DisplayData {
   }
 
   @Override
+  public int hashCode() {
+    return entries.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof DisplayData) {
+      DisplayData that = (DisplayData) obj;
+      return Objects.equals(this.entries, that.entries);
+    }
+
+    return false;
+  }
+
+  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     boolean isFirstLine = true;
@@ -844,7 +859,6 @@ public class DisplayData {
 
     @Override
     public ItemBuilder add(String key, Instant value) {
-      checkNotNull(value);
       return addItemIf(true, key, Type.TIMESTAMP, value);
     }
 
@@ -861,7 +875,6 @@ public class DisplayData {
 
     @Override
     public ItemBuilder add(String key, Duration value) {
-      checkNotNull(value);
       return addItemIf(true, key, Type.DURATION, value);
     }
 
@@ -878,7 +891,6 @@ public class DisplayData {
 
     @Override
     public ItemBuilder add(String key, Class<?> value) {
-      checkNotNull(value);
       return addItemIf(true, key, Type.JAVA_CLASS, value);
     }
 
@@ -912,17 +924,17 @@ public class DisplayData {
 
     @Override
     public ItemBuilder add(String key, Type type, Object value) {
-      checkNotNull(value);
       checkNotNull(type);
       return addItemIf(true, key, type, value);
     }
 
     private ItemBuilder addItemIf(boolean condition, String key, Type type, Object value) {
-      checkNotNull(key);
-      checkArgument(!key.isEmpty());
-
+      checkNotNull(key, "Display data keys cannot be null or empty.");
+      checkArgument(!key.isEmpty(), "Display data keys cannot be null or empty.");
       commitLatest();
+
       if (condition) {
+        checkNotNull(value, "Display data values cannot be null. Key: [%s]", key);
         latestItem = Item.create(latestNs, key, type, value);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java
new file mode 100644
index 0000000..82f0329
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.options;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.not;
+
+import com.google.common.collect.ImmutableSet;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Set;
+
+/**
+ * Unit tests for {@link PipelineOptionsReflector}.
+ */
+@RunWith(JUnit4.class)
+public class PipelineOptionsReflectorTest {
+  @Test
+  public void testGetOptionSpecs() throws NoSuchMethodException {
+    Set<PipelineOptionSpec> properties =
+        PipelineOptionsReflector.getOptionSpecs(SimpleOptions.class);
+
+    assertThat(properties, Matchers.hasItems(PipelineOptionSpec.of(
+        SimpleOptions.class, "foo", SimpleOptions.class.getDeclaredMethod("getFoo"))));
+  }
+
+  interface SimpleOptions extends PipelineOptions {
+    String getFoo();
+    void setFoo(String value);
+  }
+
+  @Test
+  public void testFiltersNonGetterMethods() {
+    Set<PipelineOptionSpec> properties =
+        PipelineOptionsReflector.getOptionSpecs(OnlyTwoValidGetters.class);
+
+    assertThat(properties, not(hasItem(hasName(isOneOf("misspelled", "hasParameter", "prefix")))));
+  }
+
+  interface OnlyTwoValidGetters extends PipelineOptions {
+    String getFoo();
+    void setFoo(String value);
+
+    boolean isBar();
+    void setBar(boolean value);
+
+    String gtMisspelled();
+    void setMisspelled(String value);
+
+    String getHasParameter(String value);
+    void setHasParameter(String value);
+
+    String noPrefix();
+    void setNoPrefix(String value);
+  }
+
+  @Test
+  public void testBaseClassOptions() {
+    Set<PipelineOptionSpec> props =
+        PipelineOptionsReflector.getOptionSpecs(ExtendsSimpleOptions.class);
+
+    assertThat(props, Matchers.hasItem(
+        allOf(hasName("foo"), hasClass(SimpleOptions.class))));
+    assertThat(props, Matchers.hasItem(
+        allOf(hasName("foo"), hasClass(ExtendsSimpleOptions.class))));
+    assertThat(props, Matchers.hasItem(
+        allOf(hasName("bar"), hasClass(ExtendsSimpleOptions.class))));
+  }
+
+  interface ExtendsSimpleOptions extends SimpleOptions {
+    @Override String getFoo();
+    @Override void setFoo(String value);
+
+    String getBar();
+    void setBar(String value);
+  }
+
+  @Test
+  public void testExcludesNonPipelineOptionsMethods() {
+    Set<PipelineOptionSpec> properties =
+        PipelineOptionsReflector.getOptionSpecs(ExtendsNonPipelineOptions.class);
+
+    assertThat(properties, not(hasItem(hasName("foo"))));
+  }
+
+  interface NoExtendsClause {
+    String getFoo();
+    void setFoo(String value);
+  }
+
+  interface ExtendsNonPipelineOptions extends NoExtendsClause, PipelineOptions {}
+
+  @Test
+  public void testExcludesHiddenInterfaces() {
+    Set<PipelineOptionSpec> properties =
+        PipelineOptionsReflector.getOptionSpecs(HiddenOptions.class);
+
+    assertThat(properties, not(hasItem(hasName("foo"))));
+  }
+
+  @Hidden
+  interface HiddenOptions extends PipelineOptions {
+    String getFoo();
+    void setFoo(String value);
+  }
+
+  @Test
+  public void testMultipleInputInterfaces() {
+    Set<Class<? extends PipelineOptions>> interfaces =
+        ImmutableSet.<Class<? extends PipelineOptions>>of(
+          BaseOptions.class,
+          ExtendOptions1.class,
+          ExtendOptions2.class);
+
+    Set<PipelineOptionSpec> props = PipelineOptionsReflector.getOptionSpecs(interfaces);
+
+    assertThat(props, Matchers.hasItem(allOf(hasName("baseOption"), hasClass(BaseOptions.class))));
+    assertThat(props, Matchers.hasItem(
+        allOf(hasName("extendOption1"), hasClass(ExtendOptions1.class))));
+    assertThat(props, Matchers.hasItem(
+        allOf(hasName("extendOption2"), hasClass(ExtendOptions2.class))));
+  }
+
+  interface BaseOptions extends PipelineOptions {
+    String getBaseOption();
+    void setBaseOption(String value);
+  }
+
+  interface ExtendOptions1 extends BaseOptions {
+    String getExtendOption1();
+    void setExtendOption1(String value);
+  }
+
+  interface ExtendOptions2 extends BaseOptions {
+    String getExtendOption2();
+    void setExtendOption2(String value);
+  }
+
+  private static Matcher<PipelineOptionSpec> hasName(String name) {
+    return hasName(is(name));
+  }
+
+  private static Matcher<PipelineOptionSpec> hasName(Matcher<String> matcher) {
+    return new FeatureMatcher<PipelineOptionSpec, String>(matcher, "name", "name") {
+      @Override
+      protected String featureValueOf(PipelineOptionSpec actual) {
+        return actual.getName();
+      }
+    };
+  }
+
+  private static Matcher<PipelineOptionSpec> hasClass(Class<?> clazz) {
+    return new FeatureMatcher<PipelineOptionSpec, Class<?>>(
+        Matchers.<Class<?>>is(clazz), "defining class", "class") {
+      @Override
+      protected Class<?> featureValueOf(PipelineOptionSpec actual) {
+        return actual.getDefiningInterface();
+      }
+    };
+  }
+
+  private static Matcher<PipelineOptionSpec> hasGetter(String methodName) {
+    return new FeatureMatcher<PipelineOptionSpec, String>(
+        is(methodName), "getter method", "name") {
+      @Override
+      protected String featureValueOf(PipelineOptionSpec actual) {
+        return actual.getGetterMethod().getName();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 7f0fa14..b53000d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -17,12 +17,23 @@
  */
 package org.apache.beam.sdk.options;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -33,12 +44,19 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -48,6 +66,14 @@ import java.util.Set;
 @RunWith(JUnit4.class)
 public class ProxyInvocationHandlerTest {
   @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Rule public TestRule resetPipelineOptionsRegistry = new ExternalResource() {
+    @Override
+    protected void before() {
+      PipelineOptionsFactory.resetRegistry();
+    }
+  };
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
 
   /** A test interface with some primitives and objects. */
   public static interface Simple extends PipelineOptions {
@@ -433,6 +459,19 @@ public class ProxyInvocationHandlerTest {
   }
 
   @Test
+  public void testResetRegistry() {
+    Set<Class<? extends PipelineOptions>> defaultRegistry =
+        new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
+    assertThat(defaultRegistry, not(hasItem(FooOptions.class)));
+
+    PipelineOptionsFactory.register(FooOptions.class);
+    assertThat(PipelineOptionsFactory.getRegisteredOptions(), hasItem(FooOptions.class));
+
+    PipelineOptionsFactory.resetRegistry();
+    assertEquals(defaultRegistry, PipelineOptionsFactory.getRegisteredOptions());
+  }
+
+  @Test
   public void testJsonConversionForDefault() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     assertNotNull(serializeDeserialize(PipelineOptions.class, options));
@@ -683,10 +722,233 @@ public class ProxyInvocationHandlerTest {
     assertEquals("TestString", options2.getValue().getValue());
   }
 
+  @Test
+  public void testDisplayDataItemProperties() {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation("myTemp");
+    DisplayData displayData = DisplayData.from(options);
+
+    assertThat(displayData, hasDisplayItem(allOf(
+        hasKey("tempLocation"),
+        hasType(DisplayData.Type.STRING),
+        hasValue("myTemp"),
+        hasNamespace(PipelineOptions.class)
+    )));
+  }
+
+  @Test
+  public void testDisplayDataTypes() {
+    Instant now = Instant.now();
+
+    TypedOptions options = PipelineOptionsFactory.as(TypedOptions.class);
+    options.setInteger(1234);
+    options.setTimestamp(now);
+    options.setJavaClass(ProxyInvocationHandlerTest.class);
+    options.setObject(new Serializable() {
+      @Override
+      public String toString() {
+        return "foobar";
+      }
+    });
+
+    DisplayData displayData = DisplayData.from(options);
+
+    assertThat(displayData, hasDisplayItem("integer", 1234));
+    assertThat(displayData, hasDisplayItem("timestamp", now));
+    assertThat(displayData, hasDisplayItem("javaClass", ProxyInvocationHandlerTest.class));
+    assertThat(displayData, hasDisplayItem("object", "foobar"));
+  }
+
+  interface TypedOptions extends PipelineOptions {
+    int getInteger();
+    void setInteger(int value);
+
+    Instant getTimestamp();
+    void setTimestamp(Instant value);
+
+    Class<?> getJavaClass();
+    void setJavaClass(Class<?> value);
+
+    Object getObject();
+    void setObject(Object value);
+  }
+
+  @Test
+  public void testDisplayDataInheritanceNamespace() {
+    ExtendsBaseOptions options = PipelineOptionsFactory.as(ExtendsBaseOptions.class);
+    options.setFoo("bar");
+
+    DisplayData displayData = DisplayData.from(options);
+
+    assertThat(displayData, hasDisplayItem(allOf(
+        hasKey("foo"),
+        hasValue("bar"),
+        hasNamespace(ExtendsBaseOptions.class)
+    )));
+  }
+
+  interface BaseOptions extends PipelineOptions {
+    String getFoo();
+    void setFoo(String value);
+  }
+
+  interface ExtendsBaseOptions extends BaseOptions {
+    @Override String getFoo();
+    @Override void setFoo(String value);
+  }
+
+  @Test
+  public void testDisplayDataExcludedFromOverriddenBaseClass() {
+    ExtendsBaseOptions options = PipelineOptionsFactory.as(ExtendsBaseOptions.class);
+    options.setFoo("bar");
+
+    DisplayData displayData = DisplayData.from(options);
+    assertThat(displayData, not(hasDisplayItem(hasNamespace(BaseOptions.class))));
+  }
+
+  @Test
+  public void testDisplayDataIncludedForDisjointInterfaceHierarchies() {
+    FooOptions fooOptions = PipelineOptionsFactory.as(FooOptions.class);
+    fooOptions.setFoo("foo");
+
+    BarOptions barOptions = fooOptions.as(BarOptions.class);
+    barOptions.setBar("bar");
+
+    DisplayData data = DisplayData.from(barOptions);
+    assertThat(data, hasDisplayItem(allOf(hasKey("foo"), hasNamespace(FooOptions.class))));
+    assertThat(data, hasDisplayItem(allOf(hasKey("bar"), hasNamespace(BarOptions.class))));
+  }
+
+  interface FooOptions extends PipelineOptions {
+    String getFoo();
+    void setFoo(String value);
+  }
+
+  interface BarOptions extends PipelineOptions {
+    String getBar();
+    void setBar(String value);
+  }
+
+  @Test
+  public void testDisplayDataExcludesDefaultValues() {
+    PipelineOptions options = PipelineOptionsFactory.as(HasDefaults.class);
+    DisplayData data = DisplayData.from(options);
+
+    assertThat(data, not(hasDisplayItem(hasKey("foo"))));
+  }
+
+  interface HasDefaults extends PipelineOptions {
+    @Default.String("bar")
+    String getFoo();
+    void setFoo(String value);
+  }
+
+  @Test
+  public void testDisplayDataExcludesValuesAccessedButNeverSet() {
+    HasDefaults options = PipelineOptionsFactory.as(HasDefaults.class);
+    assertEquals("bar", options.getFoo());
+
+    DisplayData data = DisplayData.from(options);
+    assertThat(data, not(hasDisplayItem(hasKey("foo"))));
+  }
+
+  @Test
+  public void testDisplayDataIncludesExplicitlySetDefaults() {
+    HasDefaults options = PipelineOptionsFactory.as(HasDefaults.class);
+    String defaultValue = options.getFoo();
+    options.setFoo(defaultValue);
+
+    DisplayData data = DisplayData.from(options);
+    assertThat(data, hasDisplayItem("foo", defaultValue));
+  }
+
+  @Test
+  public void testDisplayDataNullValuesConvertedToEmptyString() {
+    FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+    options.setFoo(null);
+
+    DisplayData data = DisplayData.from(options);
+    assertThat(data, hasDisplayItem("foo", ""));
+  }
+
+  @Test
+  public void testDisplayDataJsonSerialization() throws IOException {
+    FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+    options.setFoo("bar");
+
+    @SuppressWarnings("unchecked")
+    Map<String, Object> map = MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class);
+
+    assertThat("main pipeline options data keyed as 'options'", map, Matchers.hasKey("options"));
+    assertThat("display data keyed as 'display_data'", map, Matchers.hasKey("display_data"));
+
+    Map<?, ?> expectedDisplayItem = ImmutableMap.<String, String>builder()
+        .put("namespace", FooOptions.class.getName())
+        .put("key", "foo")
+        .put("value", "bar")
+        .put("type", "STRING")
+        .build();
+
+    @SuppressWarnings("unchecked")
+    List<Map<?, ?>> deserializedDisplayData = (List<Map<?, ?>>) map.get("display_data");
+    assertThat(deserializedDisplayData, hasItem(expectedDisplayItem));
+  }
+
+  @Test
+  public void testDisplayDataFromDeserializedJson() throws Exception {
+    FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+    options.setFoo("bar");
+    DisplayData data = DisplayData.from(options);
+    assertThat(data, hasDisplayItem("foo", "bar"));
+
+    FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, options);
+    DisplayData dataAfterDeserialization = DisplayData.from(deserializedOptions);
+    assertEquals(data, dataAfterDeserialization);
+  }
+
+  @Test
+  public void testDisplayDataDeserializationWithRegistration() throws Exception {
+    PipelineOptionsFactory.register(HasClassOptions.class);
+    HasClassOptions options = PipelineOptionsFactory.as(HasClassOptions.class);
+    options.setClassOption(ProxyInvocationHandlerTest.class);
+
+    PipelineOptions deserializedOptions = serializeDeserialize(PipelineOptions.class, options);
+    DisplayData displayData = DisplayData.from(deserializedOptions);
+    assertThat(displayData, hasDisplayItem("classOption", ProxyInvocationHandlerTest.class));
+  }
+
+  @Test
+  public void testDisplayDataMissingPipelineOptionsRegistration() throws Exception {
+    HasClassOptions options = PipelineOptionsFactory.as(HasClassOptions.class);
+    options.setClassOption(ProxyInvocationHandlerTest.class);
+
+    PipelineOptions deserializedOptions = serializeDeserialize(PipelineOptions.class, options);
+    DisplayData displayData = DisplayData.from(deserializedOptions);
+    String expectedJsonValue = MAPPER.writeValueAsString(ProxyInvocationHandlerTest.class);
+    assertThat(displayData, hasDisplayItem("classOption", expectedJsonValue));
+  }
+
+  interface HasClassOptions extends PipelineOptions {
+    Class<?> getClassOption();
+    void setClassOption(Class<?> value);
+  }
+
+  @Test
+  public void testDisplayDataJsonValueSetAfterDeserialization() throws Exception {
+    FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
+    options.setFoo("bar");
+    DisplayData data = DisplayData.from(options);
+    assertThat(data, hasDisplayItem("foo", "bar"));
+
+    FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, options);
+    deserializedOptions.setFoo("baz");
+    DisplayData dataAfterDeserialization = DisplayData.from(deserializedOptions);
+    assertThat(dataAfterDeserialization, hasDisplayItem("foo", "baz"));
+  }
+
   private <T extends PipelineOptions> T serializeDeserialize(Class<T> kls, PipelineOptions options)
       throws Exception {
-    ObjectMapper mapper = new ObjectMapper();
-    String value = mapper.writeValueAsString(options);
-    return mapper.readValue(value, PipelineOptions.class).as(kls);
+    String value = MAPPER.writeValueAsString(options);
+    return MAPPER.readValue(value, PipelineOptions.class).as(kls);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c16bdd6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 106c441..05d0f6f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -384,6 +384,31 @@ public class DisplayDataTest {
   }
 
   @Test
+  public void testDisplayDataEquality() {
+    HasDisplayData component1 = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add("foo", "bar");
+      }
+    };
+    HasDisplayData component2 = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add("foo", "bar");
+      }
+    };
+
+    DisplayData component1DisplayData1 = DisplayData.from(component1);
+    DisplayData component1DisplayData2 = DisplayData.from(component1);
+    DisplayData component2DisplayData = DisplayData.from(component2);
+
+    new EqualsTester()
+        .addEqualityGroup(component1DisplayData1, component1DisplayData2)
+        .addEqualityGroup(component2DisplayData)
+        .testEquals();
+  }
+
+  @Test
   public void testAnonymousClassNamespace() {
     DisplayData data =
         DisplayData.from(


Mime
View raw message