beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Mark primitive display data tests RunnableOnService
Date Fri, 08 Jul 2016 19:52:07 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 744b0474e -> 994febef4


Mark primitive display data tests RunnableOnService


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9e0b1b42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9e0b1b42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9e0b1b42

Branch: refs/heads/master
Commit: 9e0b1b423ef722cdfbec0bde89d85faa760bf322
Parents: 744b047
Author: Scott Wegner <swegner@google.com>
Authored: Thu Jul 7 13:49:30 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Jul 8 12:51:39 2016 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      | 10 ---
 .../runners/dataflow/io/DataflowAvroIOTest.java | 69 --------------
 .../dataflow/io/DataflowBigQueryIOTest.java     | 94 --------------------
 .../dataflow/io/DataflowDatastoreIOTest.java    | 66 --------------
 .../dataflow/io/DataflowPubsubIOTest.java       | 63 -------------
 .../runners/dataflow/io/DataflowTextIOTest.java | 76 ----------------
 .../transforms/DataflowCombineTest.java         | 58 ------------
 .../DataflowDisplayDataEvaluator.java           | 72 ---------------
 .../transforms/DataflowMapElementsTest.java     | 55 ------------
 .../org/apache/beam/sdk/transforms/Combine.java |  5 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 34 +++++++
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 75 +++++++++++++++-
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 30 +++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 32 +++++++
 .../beam/sdk/io/datastore/V1Beta3Test.java      | 33 +++++++
 .../apache/beam/sdk/transforms/CombineTest.java | 19 ++++
 .../beam/sdk/transforms/MapElementsTest.java    | 22 +++++
 .../display/DisplayDataEvaluator.java           | 12 ++-
 18 files changed, 258 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 76e5f80..9cd1fb4 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -290,11 +290,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.api-client</groupId>
       <artifactId>google-api-client</artifactId>
     </dependency>
@@ -331,11 +326,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-bigquery</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.cloud.bigdataoss</groupId>
       <artifactId>util</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
deleted file mode 100644
index 006daa9..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-
-import org.apache.avro.Schema;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Set;
-
-/**
- * {@link DataflowRunner} specific tests for {@link AvroIO} transforms.
- */
-@RunWith(JUnit4.class)
-public class DataflowAvroIOTest {
-  @Test
-  public void testPrimitiveWriteDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-
-    AvroIO.Write.Bound<?> write = AvroIO.Write
-        .to("foo")
-        .withSchema(Schema.create(Schema.Type.STRING))
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("AvroIO.Write should include the file pattern in its primitive transform",
-        displayData, hasItem(hasDisplayItem("fileNamePattern")));
-  }
-
-  @Test
-  public void testPrimitiveReadDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-
-    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
-        .withSchema(Schema.create(Schema.Type.STRING))
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("AvroIO.Read should include the file pattern in its primitive transform",
-        displayData, hasItem(hasDisplayItem("filePattern")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
deleted file mode 100644
index 2b13b9c..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-
-import com.google.api.services.bigquery.model.TableSchema;
-
-import org.junit.Test;
-
-import java.util.Set;
-
-/**
- * Unit tests for Dataflow usage of {@link BigQueryIO} transforms.
- */
-public class DataflowBigQueryIOTest {
-  @Test
-  public void testTableSourcePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
-        .from("project:dataset.tableId")
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("BigQueryIO.Read should include the table spec in its primitive display data",
-        displayData, hasItem(hasDisplayItem("table")));
-  }
-
-  @Test
-  public void testQuerySourcePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
-        .fromQuery("foobar")
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("BigQueryIO.Read should include the query in its primitive display data",
-        displayData, hasItem(hasDisplayItem("query")));
-  }
-
-  @Test
-  public void testBatchSinkPrimitiveDisplayData() {
-    DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions();
-    options.setStreaming(false);
-    testSinkPrimitiveDisplayData(options);
-  }
-
-  @Test
-  public void testStreamingSinkPrimitiveDisplayData() {
-    DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions();
-    options.setStreaming(true);
-    testSinkPrimitiveDisplayData(options);
-  }
-
-  private void testSinkPrimitiveDisplayData(DataflowPipelineOptions options) {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(options);
-
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
-        .to("project:dataset.table")
-        .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("BigQueryIO.Write should include the table spec in its primitive display data",
-        displayData, hasItem(hasDisplayItem("tableSpec")));
-
-    assertThat("BigQueryIO.Write should include the table schema in its primitive display data",
-        displayData, hasItem(hasDisplayItem("schema")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
deleted file mode 100644
index 8cdf611..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
-import org.apache.beam.sdk.io.datastore.DatastoreIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.Query;
-
-import org.junit.Test;
-
-import java.util.Set;
-
-/**
- * Unit tests for Dataflow usage of {@link DatastoreIO} transforms.
- */
-public class DataflowDatastoreIOTest {
-  @Test
-  public void testSourcePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
-        "myProject").withQuery(Query.newBuilder().build());
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("DatastoreIO read should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-  }
-
-  @Test
-  public void testSinkPrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1beta3().write().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
deleted file mode 100644
index 27bc2d9..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
-import org.apache.beam.sdk.io.PubsubIO;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Set;
-
-/**
- * {@link DataflowRunner} specific tests for {@link PubsubIO} transforms.
- */
-@RunWith(JUnit4.class)
-public class DataflowPubsubIOTest {
-  @Test
-  public void testPrimitiveWriteDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("PubsubIO.Write should include the topic in its primitive display data",
-               displayData, hasItem(hasDisplayItem("topic")));
-  }
-
-  @Test
-  public void testPrimitiveReadDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-    PubsubIO.Read.Bound<String> read =
-        PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
-                     .maxNumRecords(1);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("PubsubIO.Read should include the subscription in its primitive display data",
-               displayData, hasItem(hasDisplayItem("subscription")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
deleted file mode 100644
index 727ffdc..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.util.TestCredential;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Set;
-
-/**
- * {@link DataflowRunner} specific tests for TextIO Read and Write transforms.
- */
-@RunWith(JUnit4.class)
-public class DataflowTextIOTest {
-  private TestDataflowPipelineOptions buildTestPipelineOptions() {
-    TestDataflowPipelineOptions options =
-        TestPipeline.testingPipelineOptions().as(TestDataflowPipelineOptions.class);
-    options.setGcpCredential(new TestCredential());
-    return options;
-  }
-
-  @Test
-  public void testPrimitiveWriteDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-
-    TextIO.Write.Bound<?> write = TextIO.Write.to("foobar");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("TextIO.Write should include the file prefix in its primitive display data",
-        displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
-  }
-
-  @Test
-  public void testPrimitiveReadDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-
-    TextIO.Read.Bound<String> read = TextIO.Read
-        .from("foobar")
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("TextIO.Read should include the file prefix in its primitive display data",
-        displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java
deleted file mode 100644
index 3af0cae..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.transforms;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineTest;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import org.junit.Test;
-
-import java.util.Set;
-
-/**
- * Unit tests for Dataflow usage of {@link Combine} transforms.
- */
-public class DataflowCombineTest {
-  @Test
-  public void testCombinePerKeyPrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-
-    CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts();
-    PTransform<PCollection<KV<Integer, Integer>>, ? extends POutput> combine =
-        Combine.perKey(combineFn);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine,
-        KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
-
-    assertThat("Combine.perKey should include the combineFn in its primitive transform",
-        displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
deleted file mode 100644
index d809cc6..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.transforms;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.util.NoopCredentialFactory;
-import org.apache.beam.sdk.util.NoopPathValidator;
-
-import com.google.common.collect.Lists;
-
-/**
- * Factory methods for creating {@link DisplayDataEvaluator} instances against the
- * {@link DataflowRunner}.
- */
-public final class DataflowDisplayDataEvaluator {
-  /** Do not instantiate. */
-  private DataflowDisplayDataEvaluator() {}
-
-  /**
-   * Retrieve a set of default {@link DataflowPipelineOptions} which can be used to build
-   * dataflow pipelines for evaluating display data.
-   */
-  public static DataflowPipelineOptions getDefaultOptions() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-
-    options.setRunner(DataflowRunner.class);
-    options.setProject("foobar");
-    options.setTempLocation("gs://bucket/tmpLocation");
-    options.setFilesToStage(Lists.<String>newArrayList());
-
-    options.as(DataflowPipelineDebugOptions.class).setPathValidatorClass(NoopPathValidator.class);
-    options.as(GcpOptions.class).setCredentialFactoryClass(NoopCredentialFactory.class);
-
-    return options;
-  }
-
-  /**
-   * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
-   * the {@link DataflowRunner}.
-   */
-  public static DisplayDataEvaluator create() {
-    return create(getDefaultOptions());
-  }
-
-  /**
-   * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
-   * the {@link DataflowRunner} with the specified {@code options}.
-   */
-  public static DisplayDataEvaluator create(DataflowPipelineOptions options) {
-    return DisplayDataEvaluator.create(options);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java
deleted file mode 100644
index 8a5e67d..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.transforms;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.Set;
-
-/**
- * Unit tests for Dataflow usage of {@link MapElements} transforms.
- */
-public class DataflowMapElementsTest implements Serializable {
-  @Test
-  public void testPrimitiveDisplayData() {
-    SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() {
-      @Override
-      public Integer apply(Integer input) {
-        return input;
-      }
-    };
-
-    MapElements<?, ?> map = MapElements.via(mapFn);
-    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map);
-    assertThat("MapElements should include the mapFn in its primitive display data",
-        displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 5faf4e3..9a87b36 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2295,6 +2295,11 @@ public class Combine {
 
               c.output(KV.of(key, combineFnRunner.apply(key, c.element().getValue(), c)));
             }
+
+            @Override
+            public void populateDisplayData(DisplayData.Builder builder) {
+              Combine.GroupedValues.this.populateDisplayData(builder);
+            }
           }).withSideInputs(sideInputs));
 
       try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 8625b10..047e7d0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -30,9 +31,11 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroIO.Write.Bound;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -40,6 +43,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 
+import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
@@ -55,6 +59,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * Tests for AvroIO Read and Write transforms.
@@ -273,6 +278,20 @@ public class AvroIOTest {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testPrimitiveReadDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+        .withSchema(Schema.create(Schema.Type.STRING))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("AvroIO.Read should include the file pattern in its primitive transform",
+        displayData, hasItem(hasDisplayItem("filePattern")));
+  }
+
+  @Test
   public void testWriteDisplayData() {
     AvroIO.Write.Bound<?> write = AvroIO.Write
         .to("foo")
@@ -291,4 +310,19 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("validation", false));
   }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testPrimitiveWriteDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    AvroIO.Write.Bound<?> write = AvroIO.Write
+        .to("foo")
+        .withSchema(Schema.create(Schema.Type.STRING))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("AvroIO.Write should include the file pattern in its primitive transform",
+        displayData, hasItem(hasDisplayItem("fileNamePattern")));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 43bf314..78d950e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -23,6 +23,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -45,6 +46,7 @@ import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -57,6 +59,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BigQueryServices;
 import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
@@ -109,6 +112,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import javax.annotation.Nullable;
 
@@ -329,9 +333,10 @@ public class BigQueryIOTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
   @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class);
   @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
-  @Mock public transient BigQueryServices.JobService mockJobService;
+  @Mock(extraInterfaces = Serializable.class)
+  public transient BigQueryServices.JobService mockJobService;
   @Mock private transient IOChannelFactory mockIOChannelFactory;
-  @Mock private transient DatasetService mockDatasetService;
+  @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
 
   private transient BigQueryOptions bqOptions;
 
@@ -637,6 +642,39 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions);
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .from("project:dataset.tableId")
+        .withTestServices(new FakeBigQueryServices()
+            .withDatasetService(mockDatasetService)
+            .withJobService(mockJobService))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("BigQueryIO.Read should include the table spec in its primitive display data",
+        displayData, hasItem(hasDisplayItem("table")));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions);
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .fromQuery("foobar")
+        .withTestServices(new FakeBigQueryServices()
+            .withDatasetService(mockDatasetService)
+            .withJobService(mockJobService))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("BigQueryIO.Read should include the query in its primitive display data",
+        displayData, hasItem(hasDisplayItem("query")));
+  }
+
+
+  @Test
   public void testBuildSink() {
     BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
     checkWriteObject(
@@ -645,6 +683,39 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testBatchSinkPrimitiveDisplayData() throws IOException, InterruptedException {
+    testSinkPrimitiveDisplayData(/* streaming: */ false);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testStreamingSinkPrimitiveDisplayData() throws IOException, InterruptedException {
+    testSinkPrimitiveDisplayData(/* streaming: */ true);
+  }
+
+  private void testSinkPrimitiveDisplayData(boolean streaming) throws IOException,
+      InterruptedException {
+    bqOptions.as(StreamingOptions.class).setStreaming(streaming);
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions);
+
+    BigQueryIO.Write.Bound write = BigQueryIO.Write
+        .to("project:dataset.table")
+        .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
+        .withTestServices(new FakeBigQueryServices()
+          .withDatasetService(mockDatasetService)
+          .withJobService(mockJobService))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("BigQueryIO.Write should include the table spec in its primitive display data",
+        displayData, hasItem(hasDisplayItem("tableSpec")));
+
+    assertThat("BigQueryIO.Write should include the table schema in its primitive display data",
+        displayData, hasItem(hasDisplayItem("schema")));
+  }
+
+  @Test
   public void testBuildSinkwithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index efa1cd2..1e9ebf2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -19,18 +19,24 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.util.Set;
+
 /**
  * Tests for PubsubIO Read and Write transforms.
  */
@@ -101,6 +107,19 @@ public class PubsubIOTest {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testPrimitiveReadDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PubsubIO.Read.Bound<String> read =
+        PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
+            .maxNumRecords(1);
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("PubsubIO.Read should include the subscription in its primitive display data",
+        displayData, hasItem(hasDisplayItem("subscription")));
+  }
+
+  @Test
   public void testWriteDisplayData() {
     String topic = "projects/project/topics/topic";
     PubsubIO.Write.Bound<?> write = PubsubIO.Write
@@ -114,4 +133,15 @@ public class PubsubIOTest {
     assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
     assertThat(displayData, hasDisplayItem("idLabel", "myId"));
   }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testPrimitiveWriteDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("PubsubIO.Write should include the topic in its primitive display data",
+        displayData, hasItem(hasDisplayItem("topic")));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index df598c8..28e9ea4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -22,8 +22,11 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -42,10 +45,12 @@ import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -79,6 +84,7 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
@@ -185,6 +191,20 @@ public class TextIOTest {
     assertThat(displayData, hasDisplayItem("validation", false));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testPrimitiveReadDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    TextIO.Read.Bound<String> read = TextIO.Read
+        .from("foobar")
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("TextIO.Read should include the file prefix in its primitive display data",
+        displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
+  }
+
   <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
     runTestWrite(elems, coder, 1);
   }
@@ -315,6 +335,18 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testPrimitiveWriteDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    TextIO.Write.Bound<?> write = TextIO.Write.to("foobar");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("TextIO.Write should include the file prefix in its primitive display data",
+        displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
+  }
+
+  @Test
   public void testUnsupportedFilePattern() throws IOException {
     File outFolder = tmpFolder.newFolder();
     // Windows doesn't like resolving paths with * in them.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
index 9a87ed3..dd22289 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java
@@ -23,6 +23,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -45,8 +46,14 @@ import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
 
 import com.google.common.collect.Lists;
 import com.google.datastore.v1beta3.Entity;
@@ -68,6 +75,7 @@ import com.google.protobuf.Int32Value;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -80,6 +88,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  * Tests for {@link V1Beta3}.
@@ -199,6 +208,18 @@ public class V1Beta3Test {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testSourcePrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
+        "myProject").withQuery(Query.newBuilder().build());
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+    assertThat("DatastoreIO read should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+  }
+
+  @Test
   public void testWriteDoesNotAllowNullProject() throws Exception {
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
@@ -233,6 +254,18 @@ public class V1Beta3Test {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testSinkPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PCollection<Entity>, ?> write =
+        DatastoreIO.v1beta3().write().withProjectId("myProject");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+  }
+
+  @Test
   public void testQuerySplitBasic() throws Exception {
     KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
     Query query = Query.newBuilder().addKind(mykind).build();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 6f6c4a1..b453089 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -50,6 +51,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -63,6 +65,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.POutput;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
@@ -711,6 +714,22 @@ public class CombineTest implements Serializable {
     assertThat(displayData, hasDisplayItem(hasNamespace(combineFn.getClass())));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testCombinePerKeyPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts();
+    PTransform<PCollection<KV<Integer, Integer>>, ? extends POutput> combine =
+        Combine.perKey(combineFn);
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine,
+        KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+
+    assertThat("Combine.perKey should include the combineFn in its primitive transform",
+        displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass())));
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // Test classes, for different kinds of combining fns.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index e6694d2..f18504c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -19,13 +19,16 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -38,6 +41,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.Serializable;
+import java.util.Set;
 
 /**
  * Tests for {@link MapElements}.
@@ -155,6 +159,24 @@ public class MapElementsTest implements Serializable {
     assertThat(DisplayData.from(simpleMap), hasDisplayItem("mapFn", simpleFn.getClass()));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testPrimitiveDisplayData() {
+    SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() {
+      @Override
+      public Integer apply(Integer input) {
+        return input;
+      }
+    };
+
+    MapElements<?, ?> map = MapElements.via(mapFn);
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map);
+    assertThat("MapElements should include the mapFn in its primitive display data",
+        displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass())));
+  }
+
   static class VoidValues<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
index a78a4ad..dc8c1e9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
@@ -41,10 +41,11 @@ public class DisplayDataEvaluator {
   private final PipelineOptions options;
 
   /**
-   * Create a new {@link DisplayDataEvaluator} using {@link TestPipeline#testingPipelineOptions()}.
+   * Create a new {@link DisplayDataEvaluator} using options returned from
+   * {@link #getDefaultOptions()}.
    */
   public static DisplayDataEvaluator create() {
-    return create(TestPipeline.testingPipelineOptions());
+    return create(getDefaultOptions());
   }
 
   /**
@@ -54,6 +55,13 @@ public class DisplayDataEvaluator {
     return new DisplayDataEvaluator(pipelineOptions);
   }
 
+  /**
+   * The default {@link PipelineOptions} which will be used by {@link #create()}.
+   */
+  public static PipelineOptions getDefaultOptions() {
+    return TestPipeline.testingPipelineOptions();
+  }
+
   private DisplayDataEvaluator(PipelineOptions options) {
     this.options = options;
   }


Mime
View raw message