Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 24AC7200BA3 for ; Thu, 20 Oct 2016 20:19:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 233FE160AE0; Thu, 20 Oct 2016 18:19:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C7EC3160ACC for ; Thu, 20 Oct 2016 20:19:50 +0200 (CEST) Received: (qmail 93898 invoked by uid 500); 20 Oct 2016 18:19:50 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 93889 invoked by uid 99); 20 Oct 2016 18:19:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 18:19:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 7144EC0E19 for ; Thu, 20 Oct 2016 18:19:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id mkl0PgduOZrf for ; Thu, 20 Oct 2016 18:19:37 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id DCCED5F3A1 for ; Thu, 20 Oct 2016 18:19:35 +0000 (UTC) Received: (qmail 90292 invoked by uid 99); 20 Oct 2016 18:19:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 18:19:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6B0B2E0579; Thu, 20 Oct 2016 18:19:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bchambers@apache.org To: commits@beam.incubator.apache.org Date: Thu, 20 Oct 2016 18:19:01 -0000 Message-Id: In-Reply-To: <909fddeaa09b42f0a51900ad53995840@git.apache.org> References: <909fddeaa09b42f0a51900ad53995840@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-beam git commit: Add Display Data 'path' metadata archived-at: Thu, 20 Oct 2016 18:19:53 -0000 Add Display Data 'path' metadata Display Data supports the notion of "sub components", components within a transform class which can contribute their own display data. We add a namespace to display data items based on the originating component, which keeps the display data items unique within the step. There are instances where a component is included multiple times within a step. We handle the case of the same instance being shared by simply ignoring it the second time. However, we don't handle the case of a separate instance being added of the same class. Currently the separate instances will add display data with the same namespace and key, causing a failure. This can come up for example when infrastructure at different levels wrap and re-wrap a component. We saw this with a bounded source being adapted multiple times, Bounded -> Unbounded -> Bounded -> Unbounded. The BoundedToUnboundedSourceAdapter was included multiple times with separate instances and caused a failure while populating display data. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad03d07a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad03d07a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad03d07a Branch: refs/heads/master Commit: ad03d07ae783f054a31e8b2e14100afff8cdf747 Parents: ff6301b Author: Scott Wegner Authored: Wed Oct 12 14:49:41 2016 -0700 Committer: bchambers Committed: Thu Oct 20 11:10:13 2016 -0700 ---------------------------------------------------------------------- .../core/UnboundedReadFromBoundedSource.java | 2 +- runners/direct-java/pom.xml | 5 - .../runners/direct/ForwardingPTransform.java | 2 +- .../beam/runners/direct/DirectRunnerTest.java | 32 -- .../direct/ForwardingPTransformTest.java | 7 +- .../beam/runners/dataflow/DataflowRunner.java | 5 +- .../DataflowUnboundedReadFromBoundedSource.java | 4 +- .../sdk/io/BoundedReadFromUnboundedSource.java | 5 +- .../apache/beam/sdk/io/CompressedSource.java | 2 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 5 +- .../main/java/org/apache/beam/sdk/io/Read.java | 4 +- .../main/java/org/apache/beam/sdk/io/Write.java | 6 +- .../sdk/options/ProxyInvocationHandler.java | 149 +++--- .../org/apache/beam/sdk/transforms/Combine.java | 60 +-- .../apache/beam/sdk/transforms/CombineFns.java | 33 +- .../beam/sdk/transforms/CombineWithContext.java | 3 +- .../beam/sdk/transforms/DoFnAdapters.java | 2 +- .../beam/sdk/transforms/FlatMapElements.java | 6 +- .../apache/beam/sdk/transforms/MapElements.java | 8 +- .../org/apache/beam/sdk/transforms/ParDo.java | 60 ++- .../apache/beam/sdk/transforms/Partition.java | 2 +- .../sdk/transforms/display/DisplayData.java | 518 +++++++++++++------ .../beam/sdk/transforms/windowing/Window.java | 2 +- .../io/BoundedReadFromUnboundedSourceTest.java | 4 +- .../beam/sdk/io/CompressedSourceTest.java | 4 +- .../java/org/apache/beam/sdk/io/ReadTest.java | 10 +- .../java/org/apache/beam/sdk/io/WriteTest.java | 6 +- .../sdk/options/ProxyInvocationHandlerTest.java | 40 ++ .../beam/sdk/transforms/CombineFnsTest.java | 7 +- .../apache/beam/sdk/transforms/CombineTest.java | 4 +- .../apache/beam/sdk/transforms/ParDoTest.java | 8 +- .../transforms/display/DisplayDataMatchers.java | 141 +++-- .../display/DisplayDataMatchersTest.java | 67 ++- .../sdk/transforms/display/DisplayDataTest.java | 367 +++++++++---- .../sdk/transforms/windowing/WindowTest.java | 4 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 - .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +- 38 files changed, 988 insertions(+), 605 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 91a1715..2afdcf2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -108,7 +108,7 @@ public class UnboundedReadFromBoundedSource extends PTransform - com.fasterxml.jackson.core - jackson-annotations - - - org.slf4j slf4j-api http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java index 3160b58..77311c2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java @@ -57,6 +57,6 @@ public abstract class ForwardingPTransform { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("source", source.getClass())); - builder.include(source); + builder + .add(DisplayData.item("source", source.getClass())) + .include("source", source); } public UnboundedSource getSource() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java index e4257d1..96a35bc 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java @@ -120,7 +120,7 @@ public class DataflowUnboundedReadFromBoundedSource extends PTransform extends PTransform extends PTransform @@ -204,8 +204,7 @@ public class BoundedReadFromUnboundedSource extends PTransform> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 680dc2c..bf871b7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -390,7 +390,7 @@ public class CompressedSource extends FileBasedSource { public void populateDisplayData(DisplayData.Builder builder) { // We explicitly do not register base-class data, instead we use the delegate inner source. builder - .include(sourceDelegate) + .include("source", sourceDelegate) .add(DisplayData.item("source", sourceDelegate.getClass()) .withLabel("Read Source")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 6091156..72a6399 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -792,8 +792,7 @@ public class PubsubIO { @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - Bound.this.populateDisplayData(builder); + builder.delegate(Bound.this); } } } @@ -1043,7 +1042,7 @@ public class PubsubIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - Bound.this.populateDisplayData(builder); + builder.delegate(Bound.this); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 29c4e47..f04fbaf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -127,7 +127,7 @@ public class Read { builder .add(DisplayData.item("source", source.getClass()) .withLabel("Read Source")) - .include(source); + .include("source", source); } } @@ -194,7 +194,7 @@ public class Read { builder .add(DisplayData.item("source", source.getClass()) .withLabel("Read Source")) - .include(source); + .include("source", source); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index e8b19d9..7559fca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -118,7 +118,7 @@ public class Write { super.populateDisplayData(builder); builder .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink")) - .include(sink) + .include("sink", sink) .addIfNotDefault( DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"), 0); @@ -209,7 +209,7 @@ public class Write { @Override public void populateDisplayData(DisplayData.Builder builder) { - Write.Bound.this.populateDisplayData(builder); + builder.delegate(Write.Bound.this); } } @@ -261,7 +261,7 @@ public class Write { @Override public void populateDisplayData(DisplayData.Builder builder) { - Write.Bound.this.populateDisplayData(builder); + builder.delegate(Write.Bound.this); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index a77dcc6..3e74916 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -86,7 +86,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers; * {@link PipelineOptions#as(Class)}. */ @ThreadSafe -class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { +class ProxyInvocationHandler implements InvocationHandler { private static final ObjectMapper MAPPER = new ObjectMapper(); /** * No two instances of this class are considered equivalent hence we generate a random hash code. @@ -138,8 +138,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { && args[0] instanceof DisplayData.Builder) { @SuppressWarnings("unchecked") DisplayData.Builder builder = (DisplayData.Builder) args[0]; - // Explicitly set display data namespace so thrown exceptions will have sensible type. - builder.include(this, PipelineOptions.class); + builder.delegate(new PipelineOptionsDisplayData()); return Void.TYPE; } String methodName = method.getName(); @@ -243,88 +242,116 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { } /** - * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set - * pipeline options will be added as display data. + * Nested class to handle display data in order to set the display data namespace to something + * sensible. */ - public void populateDisplayData(DisplayData.Builder builder) { - Set optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces); - Multimap optionsMap = buildOptionNameToSpecMap(optionSpecs); - - for (Map.Entry option : options.entrySet()) { - BoundValue boundValue = option.getValue(); - if (boundValue.isDefault()) { - continue; - } + class PipelineOptionsDisplayData implements HasDisplayData { + /** + * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set + * pipeline options will be added as display data. + */ + public void populateDisplayData(DisplayData.Builder builder) { + Set optionSpecs = + PipelineOptionsReflector.getOptionSpecs(knownInterfaces); - Object value = boundValue.getValue() == null ? "" : boundValue.getValue(); - DisplayData.Type type = DisplayData.inferType(value); - HashSet specs = new HashSet<>(optionsMap.get(option.getKey())); + Multimap optionsMap = buildOptionNameToSpecMap(optionSpecs); - for (PipelineOptionSpec optionSpec : specs) { - if (!optionSpec.shouldSerialize()) { - // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also - // excluded from display data. These options are generally not useful for display. + for (Map.Entry option : options.entrySet()) { + BoundValue boundValue = option.getValue(); + if (boundValue.isDefault()) { continue; } - Class pipelineInterface = optionSpec.getDefiningInterface(); - if (type != null) { - builder.add(DisplayData.item(option.getKey(), type, value) - .withNamespace(pipelineInterface)); - } else { - builder.add(DisplayData.item(option.getKey(), displayDataString(value)) - .withNamespace(pipelineInterface)); + DisplayDataValue resolved = DisplayDataValue.resolve(boundValue.getValue()); + HashSet specs = new HashSet<>(optionsMap.get(option.getKey())); + + for (PipelineOptionSpec optionSpec : specs) { + if (!optionSpec.shouldSerialize()) { + // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also + // excluded from display data. These options are generally not useful for display. + continue; + } + + builder.add(DisplayData.item(option.getKey(), resolved.getType(), resolved.getValue()) + .withNamespace(optionSpec.getDefiningInterface())); } } - } - for (Map.Entry jsonOption : jsonOptions.entrySet()) { - if (options.containsKey(jsonOption.getKey())) { - // Option overwritten since deserialization; don't re-write - continue; - } + for (Map.Entry jsonOption : jsonOptions.entrySet()) { + if (options.containsKey(jsonOption.getKey())) { + // Option overwritten since deserialization; don't re-write + continue; + } + + HashSet specs = new HashSet<>(optionsMap.get(jsonOption.getKey())); + if (specs.isEmpty()) { + // No PipelineOptions interface for this key not currently loaded + builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString()) + .withNamespace(UnknownPipelineOptions.class)); + continue; + } - HashSet specs = new HashSet<>(optionsMap.get(jsonOption.getKey())); - if (specs.isEmpty()) { - builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString()) - .withNamespace(UnknownPipelineOptions.class)); - } else { for (PipelineOptionSpec spec : specs) { if (!spec.shouldSerialize()) { continue; } Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod()); - value = value == null ? "" : value; - DisplayData.Type type = DisplayData.inferType(value); - if (type != null) { - builder.add(DisplayData.item(jsonOption.getKey(), type, value) - .withNamespace(spec.getDefiningInterface())); - } else { - builder.add(DisplayData.item(jsonOption.getKey(), displayDataString(value)) - .withNamespace(spec.getDefiningInterface())); - } + DisplayDataValue resolved = DisplayDataValue.resolve(value); + builder.add(DisplayData.item(jsonOption.getKey(), resolved.getType(), resolved.getValue()) + .withNamespace(spec.getDefiningInterface())); } } } } /** - * {@link Object#toString()} wrapper to extract display data values for various types. + * Helper class to resolve a {@link DisplayData} type and value from {@link PipelineOptions}. */ - private String displayDataString(Object value) { - checkNotNull(value, "value cannot be null"); - if (!value.getClass().isArray()) { - return value.toString(); - } - if (!value.getClass().getComponentType().isPrimitive()) { - return Arrays.deepToString((Object[]) value); + @AutoValue + abstract static class DisplayDataValue { + /** + * The resolved display data value. May differ from the input to {@link #resolve(Object)} + */ + abstract Object getValue(); + + /** The resolved display data type. */ + abstract DisplayData.Type getType(); + + /** + * Infer the value and {@link DisplayData.Type type} for the given + * {@link PipelineOptions} value. + */ + static DisplayDataValue resolve(@Nullable Object value) { + DisplayData.Type type = DisplayData.inferType(value); + + if (type == null) { + value = displayDataString(value); + type = DisplayData.Type.STRING; + } + + return new AutoValue_ProxyInvocationHandler_DisplayDataValue(value, type); } - // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an - // Object array, but will unwrap nested primitive arrays. - String wrapped = Arrays.deepToString(new Object[] {value}); - return wrapped.substring(1, wrapped.length() - 1); + /** + * Safe {@link Object#toString()} wrapper to extract display data values for various types. + */ + private static String displayDataString(@Nullable Object value) { + if (value == null) { + return ""; + } + if (!value.getClass().isArray()) { + return value.toString(); + } + if (!value.getClass().getComponentType().isPrimitive()) { + return Arrays.deepToString((Object[]) value); + } + + // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an + // Object array, but will unwrap nested primitive arrays. + String wrapped = Arrays.deepToString(new Object[]{value}); + return wrapped.substring(1, wrapped.length() - 1); + } } /** @@ -587,7 +614,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { List> serializedDisplayData = Lists.newArrayList(); DisplayData displayData = DisplayData.from(value); - for (DisplayData.Item item : displayData.items()) { + for (DisplayData.Item item : displayData.items()) { @SuppressWarnings("unchecked") Map serializedItem = MAPPER.convertValue(item, Map.class); serializedDisplayData.add(serializedItem); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index df9a306..7719c73 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -124,14 +124,14 @@ public class Combine { return globally(fn, displayDataForFn(fn)); } - private static DisplayData.Item> displayDataForFn(T fn) { + private static DisplayData.ItemSpec> displayDataForFn(T fn) { return DisplayData.item("combineFn", fn.getClass()) .withLabel("Combiner"); } private static Globally globally( GlobalCombineFn fn, - DisplayData.Item> fnDisplayData) { + DisplayData.ItemSpec> fnDisplayData) { return new Globally<>(fn, fnDisplayData, true, 0); } @@ -200,7 +200,7 @@ public class Combine { private static PerKey perKey( PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData) { + DisplayData.ItemSpec> fnDisplayData) { return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/); } @@ -210,7 +210,7 @@ public class Combine { */ private static PerKey fewKeys( PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData) { + DisplayData.ItemSpec> fnDisplayData) { return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/); } @@ -294,7 +294,7 @@ public class Combine { private static GroupedValues groupedValues( PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData) { + DisplayData.ItemSpec> fnDisplayData) { return new GroupedValues<>(fn, fnDisplayData); } @@ -521,7 +521,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(CombineFn.this); + builder.delegate(CombineFn.this); } }; } @@ -1258,7 +1258,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(KeyedCombineFn.this); + builder.delegate(KeyedCombineFn.this); } }; } @@ -1325,13 +1325,13 @@ public class Combine { extends PTransform, PCollection> { private final GlobalCombineFn fn; - private final DisplayData.Item> fnDisplayData; + private final DisplayData.ItemSpec> fnDisplayData; private final boolean insertDefault; private final int fanout; private final List> sideInputs; private Globally(GlobalCombineFn fn, - DisplayData.Item> fnDisplayData, boolean insertDefault, int fanout) { + DisplayData.ItemSpec> fnDisplayData, boolean insertDefault, int fanout) { this.fn = fn; this.fnDisplayData = fnDisplayData; this.insertDefault = insertDefault; @@ -1340,7 +1340,7 @@ public class Combine { } private Globally(String name, GlobalCombineFn fn, - DisplayData.Item> fnDisplayData, boolean insertDefault, int fanout) { + DisplayData.ItemSpec> fnDisplayData, boolean insertDefault, int fanout) { super(name); this.fn = fn; this.fnDisplayData = fnDisplayData; @@ -1350,7 +1350,7 @@ public class Combine { } private Globally(String name, GlobalCombineFn fn, - DisplayData.Item> fnDisplayData, boolean insertDefault, int fanout, + DisplayData.ItemSpec> fnDisplayData, boolean insertDefault, int fanout, List> sideInputs) { super(name); this.fn = fn; @@ -1498,9 +1498,9 @@ public class Combine { private static void populateDisplayData( DisplayData.Builder builder, HasDisplayData fn, - DisplayData.Item> fnDisplayItem) { + DisplayData.ItemSpec> fnDisplayItem) { builder - .include(fn) + .include("combineFn", fn) .add(fnDisplayItem); } @@ -1556,13 +1556,13 @@ public class Combine { extends PTransform, PCollectionView> { private final GlobalCombineFn fn; - private final DisplayData.Item> fnDisplayData; + private final DisplayData.ItemSpec> fnDisplayData; private final boolean insertDefault; private final int fanout; private GloballyAsSingletonView( GlobalCombineFn fn, - DisplayData.Item> fnDisplayData, boolean insertDefault, int fanout) { + DisplayData.ItemSpec> fnDisplayData, boolean insertDefault, int fanout) { this.fn = fn; this.fnDisplayData = fnDisplayData; this.insertDefault = insertDefault; @@ -1762,13 +1762,13 @@ public class Combine { extends PTransform>, PCollection>> { private final PerKeyCombineFn fn; - private final DisplayData.Item> fnDisplayData; + private final DisplayData.ItemSpec> fnDisplayData; private final boolean fewKeys; private final List> sideInputs; private PerKey( PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData, boolean fewKeys) { + DisplayData.ItemSpec> fnDisplayData, boolean fewKeys) { this.fn = fn; this.fnDisplayData = fnDisplayData; this.fewKeys = fewKeys; @@ -1777,7 +1777,7 @@ public class Combine { private PerKey(String name, PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData, + DisplayData.ItemSpec> fnDisplayData, boolean fewKeys, List> sideInputs) { super(name); this.fn = fn; @@ -1788,7 +1788,7 @@ public class Combine { private PerKey( String name, PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData, boolean fewKeys) { + DisplayData.ItemSpec> fnDisplayData, boolean fewKeys) { super(name); this.fn = fn; this.fnDisplayData = fnDisplayData; @@ -1888,12 +1888,12 @@ public class Combine { extends PTransform>, PCollection>> { private final PerKeyCombineFn fn; - private final DisplayData.Item> fnDisplayData; + private final DisplayData.ItemSpec> fnDisplayData; private final SerializableFunction hotKeyFanout; private PerKeyWithHotKeyFanout(String name, PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData, + DisplayData.ItemSpec> fnDisplayData, SerializableFunction hotKeyFanout) { super(name); this.fn = fn; @@ -1976,7 +1976,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; postCombine = @@ -2024,7 +2024,7 @@ public class Combine { } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; } else { @@ -2068,7 +2068,7 @@ public class Combine { } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; postCombine = @@ -2117,7 +2117,7 @@ public class Combine { } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; } @@ -2202,7 +2202,7 @@ public class Combine { Combine.populateDisplayData(builder, fn, fnDisplayData); if (hotKeyFanout instanceof HasDisplayData) { - builder.include((HasDisplayData) hotKeyFanout); + builder.include("hotKeyFanout", (HasDisplayData) hotKeyFanout); } builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) .withLabel("Fanout Function")); @@ -2349,12 +2349,12 @@ public class Combine { PCollection>> { private final PerKeyCombineFn fn; - private final DisplayData.Item> fnDisplayData; + private final DisplayData.ItemSpec> fnDisplayData; private final List> sideInputs; private GroupedValues( PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData) { + DisplayData.ItemSpec> fnDisplayData) { this.fn = SerializableUtils.clone(fn); this.fnDisplayData = fnDisplayData; this.sideInputs = ImmutableList.of(); @@ -2362,7 +2362,7 @@ public class Combine { private GroupedValues( PerKeyCombineFn fn, - DisplayData.Item> fnDisplayData, + DisplayData.ItemSpec> fnDisplayData, List> sideInputs) { this.fn = SerializableUtils.clone(fn); this.fnDisplayData = fnDisplayData; @@ -2402,7 +2402,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - Combine.GroupedValues.this.populateDisplayData(builder); + builder.delegate(Combine.GroupedValues.this); } }).withSideInputs(sideInputs)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 229b1d2..1b3e525 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -21,18 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1044,35 +1040,12 @@ public class CombineFns { */ private static void populateDisplayData( DisplayData.Builder builder, List combineFns) { - - // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type. - Multimap, HasDisplayData> combineFnMap = ArrayListMultimap.create(); - for (int i = 0; i < combineFns.size(); i++) { HasDisplayData combineFn = combineFns.get(i); - builder.add(DisplayData.item("combineFn" + (i + 1), combineFn.getClass()) + String token = "combineFn" + (i + 1); + builder.add(DisplayData.item(token, combineFn.getClass()) .withLabel("Combine Function")); - combineFnMap.put(combineFn.getClass(), combineFn); - } - - for (Map.Entry, Collection> combineFnEntries : - combineFnMap.asMap().entrySet()) { - - Collection classCombineFns = combineFnEntries.getValue(); - if (classCombineFns.size() == 1) { - // Only one combineFn of this type, include it directly. - builder.include(Iterables.getOnlyElement(classCombineFns)); - - } else { - // Multiple combineFns of same type, add a namespace suffix so display data is - // unique and ordered. - String baseNamespace = combineFnEntries.getKey().getName(); - for (int i = 0; i < combineFns.size(); i++) { - HasDisplayData combineFn = combineFns.get(i); - String namespace = String.format("%s#%d", baseNamespace, i + 1); - builder.include(combineFn, namespace); - } - } + builder.include(token, combineFn); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java index 3dd4fe2..7ac952c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java @@ -171,8 +171,7 @@ public class CombineWithContext { @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - CombineFnWithContext.this.populateDisplayData(builder); + builder.delegate(CombineFnWithContext.this); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 12d4824..18d9333 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -246,7 +246,7 @@ public class DoFnAdapters { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(fn); + builder.delegate(fn); } private void readObject(java.io.ObjectInputStream in) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index b590d45..4ef809f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -119,7 +119,7 @@ extends PTransform, PCollection> { ////////////////////////////////////////////////////////////////////////////////////////////////// private final SimpleFunction> fn; - private final DisplayData.Item fnClassDisplayData; + private final DisplayData.ItemSpec fnClassDisplayData; private FlatMapElements( SimpleFunction> fn, @@ -166,7 +166,9 @@ extends PTransform, PCollection> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.include(fn).add(fnClassDisplayData); + builder + .include("flatMapFn", fn) + .add(fnClassDisplayData); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index 73e4359..c109034 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -103,7 +103,7 @@ extends PTransform, PCollection> { /////////////////////////////////////////////////////////////////// private final SimpleFunction fn; - private final DisplayData.Item fnClassDisplayData; + private final DisplayData.ItemSpec fnClassDisplayData; private MapElements(SimpleFunction fn, Class fnClass) { this.fn = fn; @@ -123,7 +123,7 @@ extends PTransform, PCollection> { @Override public void populateDisplayData(DisplayData.Builder builder) { - MapElements.this.populateDisplayData(builder); + builder.delegate(MapElements.this); } @Override @@ -141,6 +141,8 @@ extends PTransform, PCollection> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.include(fn).add(fnClassDisplayData); + builder + .include("mapFn", fn) + .add(fnClassDisplayData); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 8aa87e4..93eb1ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -521,7 +521,7 @@ public class ParDo { */ public static Bound of(DoFn fn) { validate(fn); - return of(adapt(fn), fn.getClass()); + return of(adapt(fn), displayDataForFn(fn)); } /** @@ -538,12 +538,17 @@ public class ParDo { */ @Deprecated public static Bound of(OldDoFn fn) { - return of(fn, fn.getClass()); + return of(fn, displayDataForFn(fn)); } private static Bound of( - OldDoFn fn, Class fnClass) { - return new Unbound().of(fn, fnClass); + OldDoFn fn, DisplayData.ItemSpec> fnDisplayData) { + return new Unbound().of(fn, fnDisplayData); + } + + private static DisplayData.ItemSpec> displayDataForFn(T fn) { + return DisplayData.item("fn", fn.getClass()) + .withLabel("Transform Function"); } /** @@ -666,7 +671,7 @@ public class ParDo { */ public Bound of(DoFn fn) { validate(fn); - return of(adapt(fn), fn.getClass()); + return of(adapt(fn), displayDataForFn(fn)); } /** @@ -681,12 +686,12 @@ public class ParDo { */ @Deprecated public Bound of(OldDoFn fn) { - return of(fn, fn.getClass()); + return of(fn, displayDataForFn(fn)); } private Bound of( - OldDoFn fn, Class fnClass) { - return new Bound<>(name, sideInputs, fn, fnClass); + OldDoFn fn, DisplayData.ItemSpec> fnDisplayData) { + return new Bound<>(name, sideInputs, fn, fnDisplayData); } } @@ -707,16 +712,16 @@ public class ParDo { // Inherits name. private final List> sideInputs; private final OldDoFn fn; - private final Class fnClass; + private final DisplayData.ItemSpec> fnDisplayData; Bound(String name, List> sideInputs, OldDoFn fn, - Class fnClass) { + DisplayData.ItemSpec> fnDisplayData) { super(name); this.sideInputs = sideInputs; this.fn = SerializableUtils.clone(fn); - this.fnClass = fnClass; + this.fnDisplayData = fnDisplayData; } /** @@ -744,7 +749,7 @@ public class ParDo { ImmutableList.Builder> builder = ImmutableList.builder(); builder.addAll(this.sideInputs); builder.addAll(sideInputs); - return new Bound<>(name, builder.build(), fn, fnClass); + return new Bound<>(name, builder.build(), fn, fnDisplayData); } /** @@ -758,7 +763,7 @@ public class ParDo { public BoundMulti withOutputTags(TupleTag mainOutputTag, TupleTagList sideOutputTags) { return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData); } @Override @@ -802,7 +807,7 @@ public class ParDo { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - ParDo.populateDisplayData(builder, fn, fnClass); + ParDo.populateDisplayData(builder, fn, fnDisplayData); } public OldDoFn getFn() { @@ -883,7 +888,7 @@ public class ParDo { */ public BoundMulti of(DoFn fn) { validate(fn); - return of(adapt(fn), fn.getClass()); + return of(adapt(fn), displayDataForFn(fn)); } /** @@ -898,12 +903,13 @@ public class ParDo { */ @Deprecated public BoundMulti of(OldDoFn fn) { - return of(fn, fn.getClass()); + return of(fn, displayDataForFn(fn)); } - private BoundMulti of(OldDoFn fn, Class fnClass) { + private BoundMulti of(OldDoFn fn, + DisplayData.ItemSpec> fnDisplayData) { return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData); } } @@ -925,20 +931,20 @@ public class ParDo { private final TupleTag mainOutputTag; private final TupleTagList sideOutputTags; private final OldDoFn fn; - private final Class fnClass; + private final DisplayData.ItemSpec> fnDisplayData; BoundMulti(String name, List> sideInputs, TupleTag mainOutputTag, TupleTagList sideOutputTags, OldDoFn fn, - Class fnClass) { + DisplayData.ItemSpec> fnDisplayData) { super(name); this.sideInputs = sideInputs; this.mainOutputTag = mainOutputTag; this.sideOutputTags = sideOutputTags; this.fn = SerializableUtils.clone(fn); - this.fnClass = fnClass; + this.fnDisplayData = fnDisplayData; } /** @@ -969,7 +975,7 @@ public class ParDo { builder.addAll(sideInputs); return new BoundMulti<>( name, builder.build(), - mainOutputTag, sideOutputTags, fn, fnClass); + mainOutputTag, sideOutputTags, fn, fnDisplayData); } @@ -1023,7 +1029,7 @@ public class ParDo { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - ParDo.populateDisplayData(builder, fn, fnClass); + ParDo.populateDisplayData(builder, fn, fnDisplayData); } public OldDoFn getFn() { @@ -1044,11 +1050,11 @@ public class ParDo { } private static void populateDisplayData( - DisplayData.Builder builder, OldDoFn fn, Class fnClass) { + DisplayData.Builder builder, OldDoFn fn, + DisplayData.ItemSpec> fnDisplayData) { builder - .include(fn) - .add(DisplayData.item("fn", fnClass) - .withLabel("Transform Function")); + .include("fn", fn) + .add(fnDisplayData); } private static boolean isSplittable(OldDoFn oldDoFn) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index 9247942..5b4eead 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -124,7 +124,7 @@ public class Partition extends PTransform, PCollectionList> @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.include(partitionDoFn); + builder.include("partitionFn", partitionDoFn); } private final transient PartitionDoFn partitionDoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 394666b..5ab6342 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -20,15 +20,19 @@ package org.apache.beam.sdk.transforms.display; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import autovalue.shaded.com.google.common.common.base.Joiner; import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonValue; import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -48,12 +52,12 @@ import org.joda.time.format.ISODateTimeFormat; * interface. */ public class DisplayData implements Serializable { - private static final DisplayData EMPTY = new DisplayData(Maps.>newHashMap()); + private static final DisplayData EMPTY = new DisplayData(Maps.newHashMap()); private static final DateTimeFormatter TIMESTAMP_FORMATTER = ISODateTimeFormat.dateTime(); - private final ImmutableMap> entries; + private final ImmutableMap entries; - private DisplayData(Map> entries) { + private DisplayData(Map entries) { this.entries = ImmutableMap.copyOf(entries); } @@ -71,7 +75,11 @@ public class DisplayData implements Serializable { */ public static DisplayData from(HasDisplayData component) { checkNotNull(component, "component argument cannot be null"); - return InternalBuilder.forRoot(component).build(); + + InternalBuilder builder = new InternalBuilder(); + builder.include(Path.root(), component); + + return builder.build(); } /** @@ -99,11 +107,11 @@ public class DisplayData implements Serializable { } @JsonValue - public Collection> items() { + public Collection items() { return entries.values(); } - public Map> asMap() { + public Map asMap() { return entries; } @@ -126,7 +134,7 @@ public class DisplayData implements Serializable { public String toString() { StringBuilder builder = new StringBuilder(); boolean isFirstLine = true; - for (Item entry : entries.values()) { + for (Item entry : entries.values()) { if (isFirstLine) { isFirstLine = false; } else { @@ -149,70 +157,81 @@ public class DisplayData implements Serializable { */ public interface Builder { /** - * Register display data from the specified subcomponent. For example, a {@link PTransform} - * which delegates to a user-provided function can implement {@link HasDisplayData} on the - * function and include it from the {@link PTransform}: + * Register display data from the specified subcomponent at the given path. For example, a + * {@link PTransform} which delegates to a user-provided function can implement + * {@link HasDisplayData} on the function and include it from the {@link PTransform}: * *
{@literal @Override}
      * public void populateDisplayData(DisplayData.Builder builder) {
      *   super.populateDisplayData(builder);
      *
      *   builder
-     *     .add(DisplayData.item("userFn", userFn)) // To register the class name of the userFn
-     *     .include(userFn); // To allow the userFn to register additional display data
+     *     // To register the class name of the userFn
+     *     .add(DisplayData.item("userFn", userFn.getClass()))
+     *     // To allow the userFn to register additional display data
+     *     .include("userFn", userFn);
      * }
      * 
* - *

Using {@code include(subcomponent)} will associate each of the registered items with the - * namespace of the {@code subcomponent} being registered. To register display data in the - * current namespace, such as from a base class implementation, use + *

Using {@code include(path, subcomponent)} will associate each of the registered items with + * the namespace of the {@code subcomponent} being registered, with the specified path element + * relative to the current path. To register display data in the current path and namespace, + * such as from a base class implementation, use * {@code subcomponent.populateDisplayData(builder)} instead. * * @see HasDisplayData#populateDisplayData(DisplayData.Builder) */ - Builder include(HasDisplayData subComponent); + Builder include(String path, HasDisplayData subComponent); /** - * Register display data from the specified subcomponent, overriding the namespace of - * subcomponent display items with the specified namespace. + * Register display data from the specified component on behalf of the current component. + * Display data items will be added with the subcomponent namespace but the current component + * path. * - * @see #include(HasDisplayData) - */ - Builder include(HasDisplayData subComponent, Class namespace); - - /** - * Register display data from the specified subcomponent, overriding the namespace of - * subcomponent display items with the specified namespace. + *

This is useful for components which simply wrap other components and wish to retain the + * display data from the wrapped component. Such components should implement + * {@code populateDisplayData} as: * - * @see #include(HasDisplayData) + *

{@literal @Override}
+     * public void populateDisplayData(DisplayData.Builder builder) {
+     *   builder.delegate(wrapped);
+     * }
+     * 
*/ - Builder include(HasDisplayData subComponent, String namespace); + Builder delegate(HasDisplayData component); /** * Register the given display item. */ - Builder add(Item item); + Builder add(ItemSpec item); /** * Register the given display item if the value is not null. */ - Builder addIfNotNull(Item item); + Builder addIfNotNull(ItemSpec item); /** * Register the given display item if the value is different than the specified default. */ - Builder addIfNotDefault(Item item, @Nullable T defaultValue); + Builder addIfNotDefault(ItemSpec item, @Nullable T defaultValue); } /** - * {@link Item Items} are the unit of display data. Each item is identified by a given key + * {@link Item Items} are the unit of display data. Each item is identified by a given path, key, * and namespace from the component the display item belongs to. * *

{@link Item Items} are registered via {@link DisplayData.Builder#add} * within {@link HasDisplayData#populateDisplayData} implementations. */ @AutoValue - public abstract static class Item implements Serializable { + public abstract static class Item { + + /** + * The path for the display item within a component hierarchy. + */ + @Nullable + @JsonIgnore + public abstract Path getPath(); /** * The namespace for the display item. The namespace defaults to the component which @@ -220,7 +239,7 @@ public class DisplayData implements Serializable { */ @Nullable @JsonGetter("namespace") - public abstract String getNamespace(); + public abstract Class getNamespace(); /** * The key for the display item. Each display item is created with a key and value @@ -240,11 +259,8 @@ public class DisplayData implements Serializable { * Retrieve the value of the display item. The value is translated from the input to * {@link DisplayData#item} into a format suitable for display. Translation is based on the * item's {@link #getType() type}. - * - *

The value will only be {@literal null} if the input value during creation was null. */ @JsonGetter("value") - @Nullable public abstract Object getValue(); /** @@ -285,27 +301,104 @@ public class DisplayData implements Serializable { @Nullable public abstract String getLinkUrl(); - private static Item create(String key, Type type, @Nullable T value) { - FormattedItemValue formatted = type.safeFormat(value); - return of(null, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null); + private static Item create(ItemSpec spec, Path path) { + checkNotNull(spec, "spec cannot be null"); + checkNotNull(path, "path cannot be null"); + Class ns = checkNotNull(spec.getNamespace(), "namespace must be set"); + + return new AutoValue_DisplayData_Item(path, ns, spec.getKey(), spec.getType(), + spec.getValue(), spec.getShortValue(), spec.getLabel(), spec.getLinkUrl()); } + @Override + public String toString() { + return String.format("%s%s:%s=%s", getPath(), getNamespace().getName(), getKey(), getValue()); + } + } + + /** + * Specifies an {@link Item} to register as display data. Each item is identified by a given + * path, key, and namespace from the component the display item belongs to. + * + *

{@link Item Items} are registered via {@link DisplayData.Builder#add} + * within {@link HasDisplayData#populateDisplayData} implementations. + */ + @AutoValue + public abstract static class ItemSpec implements Serializable { + /** + * The namespace for the display item. If unset, defaults to the component which + * the display item is registered to. + */ + @Nullable + public abstract Class getNamespace(); + + /** + * The key for the display item. Each display item is created with a key and value + * via {@link DisplayData#item}. + */ + public abstract String getKey(); + + /** + * The {@link DisplayData.Type} of display data. All display data conforms to a predefined set + * of allowed types. + */ + public abstract Type getType(); + + /** + * The value of the display item. The value is translated from the input to + * {@link DisplayData#item} into a format suitable for display. Translation is based on the + * item's {@link #getType() type}. + */ + @Nullable + public abstract Object getValue(); + /** - * Set the item {@link Item#getNamespace() namespace} from the given {@link Class}. + * The optional short value for an item, or {@code null} if none is provided. * - *

This method does not alter the current instance, but instead returns a new {@link Item} - * with the namespace set. + *

The short value is an alternative display representation for items having a long display + * value. For example, the {@link #getValue() value} for {@link Type#JAVA_CLASS} items contains + * the full class name with package, while the short value contains just the class name. + * + *

A {@link #getValue() value} will be provided for each display item, and some types may + * also provide a short-value. If a short value is provided, display data consumers may + * choose to display it instead of or in addition to the {@link #getValue() value}. */ - public Item withNamespace(Class namespace) { - checkNotNull(namespace, "namespace argument cannot be null"); - return withNamespace(namespaceOf(namespace)); + @Nullable + public abstract Object getShortValue(); + + /** + * The optional label for an item. The label is a human-readable description of what + * the metadata represents. UIs may choose to display the label instead of the item key. + */ + @Nullable + public abstract String getLabel(); + + /** + * The optional link URL for an item. The URL points to an address where the reader + * can find additional context for the display data. + */ + @Nullable + public abstract String getLinkUrl(); + + private static ItemSpec create(String key, Type type, @Nullable T value) { + return ItemSpec.builder() + .setKey(key) + .setType(type) + .setRawValue(value) + .build(); } - /** @see #withNamespace(Class) */ - public Item withNamespace(String namespace) { + /** + * Set the item {@link ItemSpec#getNamespace() namespace} from the given {@link Class}. + * + *

This method does not alter the current instance, but instead returns a new + * {@link ItemSpec} with the namespace set. + */ + public ItemSpec withNamespace(Class namespace) { checkNotNull(namespace, "namespace argument cannot be null"); - return of( - namespace, getKey(), getType(), getValue(), getShortValue(), getLabel(), getLinkUrl()); + return toBuilder() + .setNamespace(namespace) + .build(); } /** @@ -313,12 +406,13 @@ public class DisplayData implements Serializable { * *

Specifying a null value will clear the label if it was previously defined. * - *

This method does not alter the current instance, but instead returns a new {@link Item} - * with the label set. + *

This method does not alter the current instance, but instead returns a new + * {@link ItemSpec} with the label set. */ - public Item withLabel(String label) { - return of( - getNamespace(), getKey(), getType(), getValue(), getShortValue(), label, getLinkUrl()); + public ItemSpec withLabel(@Nullable String label) { + return toBuilder() + .setLabel(label) + .build(); } /** @@ -326,11 +420,13 @@ public class DisplayData implements Serializable { * *

Specifying a null value will clear the link url if it was previously defined. * - *

This method does not alter the current instance, but instead returns a new {@link Item} - * with the link url set. + *

This method does not alter the current instance, but instead returns a new + * {@link ItemSpec} with the link url set. */ - public Item withLinkUrl(String url) { - return of(getNamespace(), getKey(), getType(), getValue(), getShortValue(), getLabel(), url); + public ItemSpec withLinkUrl(@Nullable String url) { + return toBuilder() + .setLinkUrl(url) + .build(); } /** @@ -339,84 +435,166 @@ public class DisplayData implements Serializable { *

This should only be used internally. It is useful to compare the value of a * {@link DisplayData.Item} to the value derived from a specified input. */ - private Item withValue(Object value) { - FormattedItemValue formatted = getType().safeFormat(value); - return of(getNamespace(), getKey(), getType(), formatted.getLongValue(), - formatted.getShortValue(), getLabel(), getLinkUrl()); - } - - private static Item of( - @Nullable String namespace, - String key, - Type type, - @Nullable Object value, - @Nullable Object shortValue, - @Nullable String label, - @Nullable String linkUrl) { - return new AutoValue_DisplayData_Item<>( - namespace, key, type, value, shortValue, label, linkUrl); + private ItemSpec withValue(T value) { + return toBuilder() + .setRawValue(value) + .build(); } @Override public String toString() { return String.format("%s:%s=%s", getNamespace(), getKey(), getValue()); } + + static ItemSpec.Builder builder() { + return new AutoValue_DisplayData_ItemSpec.Builder<>(); + } + + abstract ItemSpec.Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + public abstract ItemSpec.Builder setKey(String key); + public abstract ItemSpec.Builder setNamespace(@Nullable Class namespace); + public abstract ItemSpec.Builder setType(Type type); + public abstract ItemSpec.Builder setValue(@Nullable Object longValue); + public abstract ItemSpec.Builder setShortValue(@Nullable Object shortValue); + public abstract ItemSpec.Builder setLabel(@Nullable String label); + public abstract ItemSpec.Builder setLinkUrl(@Nullable String url); + public abstract ItemSpec build(); + + + abstract Type getType(); + + ItemSpec.Builder setRawValue(@Nullable T value) { + FormattedItemValue formatted = getType().safeFormat(value); + return this + .setValue(formatted.getLongValue()) + .setShortValue(formatted.getShortValue()); + } + } } /** * Unique identifier for a display data item within a component. - * Identifiers are composed of the key they are registered with and a namespace generated from - * the class of the component which registered the item. + * + *

Identifiers are composed of: + * + *

    + *
  • A {@link #getPath() path} based on the component hierarchy
  • + *
  • The {@link #getKey() key} it is registered with
  • + *
  • A {@link #getNamespace() namespace} generated from the class of the component which + * registered the item.
  • + *
* *

Display data registered with the same key from different components will have different * namespaces and thus will both be represented in the composed {@link DisplayData}. If a * single component registers multiple metadata items with the same key, only the most recent * item will be retained; previous versions are discarded. */ - public static class Identifier { - private final String ns; - private final String key; + @AutoValue + public abstract static class Identifier { + public abstract Path getPath(); + public abstract Class getNamespace(); + public abstract String getKey(); - public static Identifier of(Class namespace, String key) { - return of(namespaceOf(namespace), key); + public static Identifier of(Path path, Class namespace, String key) { + return new AutoValue_DisplayData_Identifier(path, namespace, key); } - public static Identifier of(String namespace, String key) { - return new Identifier(namespace, key); + @Override + public String toString() { + return String.format("%s%s:%s", getPath(), getNamespace(), getKey()); } + } - private Identifier(String ns, String key) { - this.ns = ns; - this.key = key; + /** + * Structured path of registered display data within a component hierarchy. + * + *

Display data items registered directly by a component will have the {@link Path#root() root} + * path. If the component {@link Builder#include includes} a sub-component, its display data will + * be registered at the path specified. Each sub-component path is created by appending a child + * element to the path of its parent component, forming a hierarchy. + */ + public static class Path { + private final ImmutableList components; + private Path(ImmutableList components) { + this.components = components; } - public String getNamespace() { - return ns; + /** + * Path for display data registered by a top-level component. + */ + public static Path root() { + return new Path(ImmutableList.of()); } - public String getKey() { - return key; + /** + * Construct a path from an absolute component path hierarchy. + * + *

For the root path, use {@link Path#root()}. + * + * @param firstPath Path of the first sub-component. + * @param paths Additional path components. + */ + public static Path absolute(String firstPath, String... paths) { + ImmutableList.Builder builder = ImmutableList.builder(); + + validatePathElement(firstPath); + builder.add(firstPath); + for (String path : paths) { + validatePathElement(path); + builder.add(path); + } + + return new Path(builder.build()); } - @Override - public boolean equals(Object obj) { - if (obj instanceof Identifier) { - Identifier that = (Identifier) obj; - return Objects.equals(this.ns, that.ns) - && Objects.equals(this.key, that.key); - } + /** + * Hierarchy list of component paths making up the full path, starting with the top-level child + * component path. For the {@link #root root} path, returns the empty list. + */ + public List getComponents() { + return components; + } - return false; + /** + * Extend the path by appending a sub-component path. The new path element is added to the end + * of the path hierarchy. + * + *

Returns a new {@link Path} instance; the originating {@link Path} is not modified. + */ + public Path extend(String path) { + validatePathElement(path); + return new Path(ImmutableList.builder() + .addAll(components.iterator()) + .add(path) + .build()); } - @Override - public int hashCode() { - return Objects.hash(ns, key); + private static void validatePathElement(String path) { + checkNotNull(path); + checkArgument(!"".equals(path), "path cannot be empty"); } @Override public String toString() { - return String.format("%s:%s", ns, key); + StringBuilder b = new StringBuilder().append("["); + Joiner.on("/").appendTo(b, components); + b.append("]"); + return b.toString(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Path + && Objects.equals(components, ((Path) obj).components); + + } + + @Override + public int hashCode() { + return components.hashCode(); } } @@ -551,65 +729,79 @@ public class DisplayData implements Serializable { Object getLongValue() { return this.longValue; } - Object getShortValue() { return this.shortValue; } } private static class InternalBuilder implements Builder { - private final Map> entries; - private final Set visited; + private final Map entries; + private final Set visitedComponents; + private final Map visitedPathMap; - private String latestNs; + private Path latestPath; + private Class latestNs; private InternalBuilder() { this.entries = Maps.newHashMap(); - this.visited = Sets.newIdentityHashSet(); - } - - private static InternalBuilder forRoot(HasDisplayData instance) { - InternalBuilder builder = new InternalBuilder(); - builder.include(instance); - return builder; + this.visitedComponents = Sets.newIdentityHashSet(); + this.visitedPathMap = Maps.newHashMap(); } @Override - public Builder include(HasDisplayData subComponent) { + public Builder include(String path, HasDisplayData subComponent) { checkNotNull(subComponent, "subComponent argument cannot be null"); - return include(subComponent, subComponent.getClass()); + checkNotNull(path, "path argument cannot be null"); + + Path absolutePath = latestPath.extend(path); + + HasDisplayData existingComponent = visitedPathMap.get(absolutePath); + if (existingComponent != null) { + throw new IllegalArgumentException(String.format("Specified path '%s' already used for " + + "subcomponent %s. Subcomponents must be included using unique paths.", + path, existingComponent)); + } + + return include(absolutePath, subComponent); } @Override - public Builder include(HasDisplayData subComponent, Class namespace) { - checkNotNull(namespace, "Input namespace override cannot be null"); - return include(subComponent, namespaceOf(namespace)); + public Builder delegate(HasDisplayData component) { + checkNotNull(component); + + return include(latestPath, component); } - @Override - public Builder include(HasDisplayData subComponent, String namespace) { - checkNotNull(subComponent, "subComponent argument cannot be null"); - checkNotNull(namespace, "Input namespace override cannot be null"); - - boolean newComponent = visited.add(subComponent); - if (newComponent) { - String prevNs = this.latestNs; - this.latestNs = namespace; - - try { - subComponent.populateDisplayData(this); - } catch (PopulateDisplayDataException e) { - // Don't re-wrap exceptions recursively. - throw e; - } catch (Throwable e) { - String msg = String.format("Error while populating display data for component: %s", - namespace); - throw new PopulateDisplayDataException(msg, e); - } + private Builder include(Path path, HasDisplayData subComponent) { + if (visitedComponents.contains(subComponent)) { + // Component previously registered; ignore in order to break cyclic dependencies + return this; + } - this.latestNs = prevNs; + // New component; add it. + visitedComponents.add(subComponent); + visitedPathMap.put(path, subComponent); + Class namespace = subComponent.getClass(); + + Path prevPath = latestPath; + Class prevNs = latestNs; + latestPath = path; + latestNs = namespace; + + try { + subComponent.populateDisplayData(this); + } catch (PopulateDisplayDataException e) { + // Don't re-wrap exceptions recursively. + throw e; + } catch (Throwable e) { + String msg = String.format("Error while populating display data for component: %s", + namespace.getName()); + throw new PopulateDisplayDataException(msg, e); } + latestPath = prevPath; + latestNs = prevNs; + return this; } @@ -623,39 +815,41 @@ public class DisplayData implements Serializable { } @Override - public Builder add(Item item) { + public Builder add(ItemSpec item) { checkNotNull(item, "Input display item cannot be null"); return addItemIf(true, item); } @Override - public Builder addIfNotNull(Item item) { + public Builder addIfNotNull(ItemSpec item) { checkNotNull(item, "Input display item cannot be null"); return addItemIf(item.getValue() != null, item); } @Override - public Builder addIfNotDefault(Item item, @Nullable T defaultValue) { + public Builder addIfNotDefault(ItemSpec item, @Nullable T defaultValue) { checkNotNull(item, "Input display item cannot be null"); - Item defaultItem = item.withValue(defaultValue); + ItemSpec defaultItem = item.withValue(defaultValue); return addItemIf(!Objects.equals(item, defaultItem), item); } - private Builder addItemIf(boolean condition, Item item) { + private Builder addItemIf(boolean condition, ItemSpec spec) { if (!condition) { return this; } - checkNotNull(item, "Input display item cannot be null"); - checkNotNull(item.getValue(), "Input display value cannot be null"); - if (item.getNamespace() == null) { - item = item.withNamespace(latestNs); + checkNotNull(spec, "Input display item cannot be null"); + checkNotNull(spec.getValue(), "Input display value cannot be null"); + + if (spec.getNamespace() == null) { + spec = spec.withNamespace(latestNs); } + Item item = Item.create(spec, latestPath); - Identifier id = Identifier.of(item.getNamespace(), item.getKey()); + Identifier id = Identifier.of(item.getPath(), item.getNamespace(), item.getKey()); checkArgument(!entries.containsKey(id), - "Display data key (%s) is not unique within the specified namespace (%s).", - item.getKey(), item.getNamespace()); + "Display data key (%s) is not unique within the specified path and namespace: %s%s.", + item.getKey(), item.getPath(), item.getNamespace()); entries.put(id, item); return this; @@ -669,63 +863,63 @@ public class DisplayData implements Serializable { /** * Create a display item for the specified key and string value. */ - public static Item item(String key, @Nullable String value) { + public static ItemSpec item(String key, @Nullable String value) { return item(key, Type.STRING, value); } /** * Create a display item for the specified key and integer value. */ - public static Item item(String key, @Nullable Integer value) { + public static ItemSpec item(String key, @Nullable Integer value) { return item(key, Type.INTEGER, value); } /** * Create a display item for the specified key and integer value. */ - public static Item item(String key, @Nullable Long value) { + public static ItemSpec item(String key, @Nullable Long value) { return item(key, Type.INTEGER, value); } /** * Create a display item for the specified key and floating point value. */ - public static Item item(String key, @Nullable Float value) { + public static ItemSpec item(String key, @Nullable Float value) { return item(key, Type.FLOAT, value); } /** * Create a display item for the specified key and floating point value. */ - public static Item item(String key, @Nullable Double value) { + public static ItemSpec item(String key, @Nullable Double value) { return item(key, Type.FLOAT, value); } /** * Create a display item for the specified key and boolean value. */ - public static Item item(String key, @Nullable Boolean value) { + public static ItemSpec item(String key, @Nullable Boolean value) { return item(key, Type.BOOLEAN, value); } /** * Create a display item for the specified key and timestamp value. */ - public static Item item(String key, @Nullable Instant value) { + public static ItemSpec item(String key, @Nullable Instant value) { return item(key, Type.TIMESTAMP, value); } /** * Create a display item for the specified key and duration value. */ - public static Item item(String key, @Nullable Duration value) { + public static ItemSpec item(String key, @Nullable Duration value) { return item(key, Type.DURATION, value); } /** * Create a display item for the specified key and class value. */ - public static Item> item(String key, @Nullable Class value) { + public static ItemSpec> item(String key, @Nullable Class value) { return item(key, Type.JAVA_CLASS, value); } @@ -739,10 +933,10 @@ public class DisplayData implements Serializable { * * @see Type#inferType(Object) */ - public static Item item(String key, Type type, @Nullable T value) { + public static ItemSpec item(String key, Type type, @Nullable T value) { checkNotNull(key, "key argument cannot be null"); checkNotNull(type, "type argument cannot be null"); - return Item.create(key, type, value); + return ItemSpec.create(key, type, value); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 57f7716..684a776 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -578,7 +578,7 @@ public class Window { builder .add(DisplayData.item("windowFn", windowFn.getClass()) .withLabel("Windowing Function")) - .include(windowFn); + .include("windowFn", windowFn); } if (allowedLateness != null) {