beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/2] incubator-beam git commit: Set transform names in DisplayDataEvaluator
Date Fri, 30 Sep 2016 23:21:27 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master b237e2f05 -> bc80ee342


Set transform names in DisplayDataEvaluator

If a transform name is unset, a runner may infer the name from
the class simple name, which fails with anonymous classes. In order
to be more generically useful, we set transform names within the
DisplayDataEvaluator.

Handle anonymous classes in DataflowUnboundedReadFromBoundedSource.getKind()

Correctly implement display data for DataflowRunner Read transforms

The DataflowRunner will replace usages of the Read transform with
interally-wrapped implementations for streaming and bounded cases.
We were not properly passing display data between these usages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e7c396a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e7c396a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e7c396a

Branch: refs/heads/master
Commit: 3e7c396a09965ad688fc9b584c57cb1807360391
Parents: b237e2f
Author: Scott Wegner <swegner@google.com>
Authored: Thu Sep 29 14:43:14 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Fri Sep 30 16:12:14 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  8 ++
 .../DataflowUnboundedReadFromBoundedSource.java | 16 +++-
 ...aflowUnboundedReadFromBoundedSourceTest.java | 82 ++++++++++++++++++++
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  6 ++
 .../java/org/apache/beam/sdk/io/ReadTest.java   | 56 ++++++++++++-
 .../display/DisplayDataEvaluator.java           |  7 +-
 6 files changed, 168 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e7c396a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 54c95a7..ceaf6a0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -132,6 +132,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -2258,6 +2259,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
         return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
       }
 
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder.add(DisplayData.item("source", source.getClass()));
+        builder.include(source);
+      }
+
       public UnboundedSource<T, ?> getSource() {
         return source;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e7c396a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index 866da13..e4257d1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -105,7 +105,14 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends
PTransform<PBegin
 
   @Override
   public String getKindString() {
-    return "Read(" + approximateSimpleName(source.getClass()) + ")";
+    String sourceName;
+    if (source.getClass().isAnonymousClass()) {
+      sourceName = "AnonymousSource";
+    } else {
+      sourceName = approximateSimpleName(source.getClass());
+    }
+
+    return "Read(" + sourceName + ")";
   }
 
   @Override
@@ -184,6 +191,13 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends
PTransform<PBegin
       return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("source", boundedSource.getClass()));
+      builder.include(boundedSource);
+    }
+
     @VisibleForTesting
     static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
       private final @Nullable List<TimestampedValue<T>> residualElements;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e7c396a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
new file mode 100644
index 0000000..d9e3558
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.internal;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@see DataflowUnboundedReadFromBoundedSource}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowUnboundedReadFromBoundedSourceTest {
+  @Test
+  public void testKind() {
+    DataflowUnboundedReadFromBoundedSource<?> read = new
+        DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource());
+
+    Assert.assertEquals("Read(NoopNamedSource)", read.getKindString());
+  }
+
+  @Test
+  public void testKindAnonymousSource() {
+    NoopNamedSource anonSource = new NoopNamedSource() {};
+    DataflowUnboundedReadFromBoundedSource<?> read = new
+        DataflowUnboundedReadFromBoundedSource<>(anonSource);
+
+    Assert.assertEquals("Read(AnonymousSource)", read.getKindString());
+  }
+
+  /** Source implementation only useful for its identity. */
+  static class NoopNamedSource extends BoundedSource<String> {
+    @Override
+    public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
+        PipelineOptions options) throws Exception {
+      return null;
+    }
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 0;
+    }
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return false;
+    }
+    @Override
+    public BoundedReader<String> createReader(
+        PipelineOptions options) throws IOException {
+      return null;
+    }
+    @Override
+    public void validate() {
+
+    }
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e7c396a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index b41c655..301a45e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -204,6 +204,12 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin,
PCollection<T
       return new Reader(source.createReader(options, null));
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("source", source.getClass()));
+      builder.include(source);
+    }
+
     private class Reader extends BoundedReader<ValueWithRecordId<T>> {
       private long recordsRead = 0L;
       private Instant endTime = Instant.now().plus(maxReadTime);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e7c396a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 30a8a43..a5138c5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -20,18 +20,25 @@ package org.apache.beam.sdk.io;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -98,6 +105,51 @@ public class ReadTest implements Serializable{
     assertThat(unboundedDisplayData, hasDisplayItem("maxReadTime", maxReadTime));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundedPrimitiveDisplayData() {
+    testPrimitiveDisplayData(/* isStreaming: */ false);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testStreamingPrimitiveDisplayData() {
+    testPrimitiveDisplayData(/* isStreaming: */ true);
+  }
+
+  private void testPrimitiveDisplayData(boolean isStreaming) {
+    PipelineOptions options = DisplayDataEvaluator.getDefaultOptions();
+    options.as(StreamingOptions.class).setStreaming(isStreaming);
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
+
+    SerializableBoundedSource boundedSource = new SerializableBoundedSource() {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    SerializableUnboundedSource unboundedSource = new SerializableUnboundedSource() {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+
+    Read.Bounded<String> bounded = Read.from(boundedSource);
+    BoundedReadFromUnboundedSource<String> unbounded = Read.from(unboundedSource)
+        .withMaxNumRecords(1234);
+
+    Set<DisplayData> boundedDisplayData = evaluator
+        .displayDataForPrimitiveSourceTransforms(bounded);
+    assertThat(boundedDisplayData, hasItem(hasDisplayItem("source", boundedSource.getClass())));
+    assertThat(boundedDisplayData, hasItem(includesDisplayDataFrom(boundedSource)));
+
+    Set<DisplayData> unboundedDisplayData = evaluator
+        .displayDataForPrimitiveSourceTransforms(unbounded);
+    assertThat(unboundedDisplayData, hasItem(hasDisplayItem("source")));
+    assertThat(unboundedDisplayData, hasItem(includesDisplayDataFrom(unboundedSource)));
+  }
+
   private abstract static class CustomBoundedSource extends BoundedSource<String> {
     @Override
     public List<? extends BoundedSource<String>> splitIntoBundles(
@@ -125,7 +177,7 @@ public class ReadTest implements Serializable{
 
     @Override
     public Coder<String> getDefaultOutputCoder() {
-      return null;
+      return StringUtf8Coder.of();
     }
   }
 
@@ -161,7 +213,7 @@ public class ReadTest implements Serializable{
 
     @Override
     public Coder<String> getDefaultOutputCoder() {
-      return null;
+      return StringUtf8Coder.of();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e7c396a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
index 1783a73..db9aea3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
@@ -96,8 +96,8 @@ public class DisplayDataEvaluator {
 
     Pipeline pipeline = Pipeline.create(options);
     pipeline
-        .apply(input)
-        .apply(root);
+        .apply("Input", input)
+        .apply("Transform", root);
 
     return displayDataForPipeline(pipeline, root);
   }
@@ -112,8 +112,7 @@ public class DisplayDataEvaluator {
   public Set<DisplayData> displayDataForPrimitiveSourceTransforms(
       final PTransform<? super PBegin, ? extends POutput> root) {
     Pipeline pipeline = Pipeline.create(options);
-    pipeline
-        .apply(root);
+    pipeline.apply("SourceTransform", root);
 
     return displayDataForPipeline(pipeline, root);
   }


Mime
View raw message