beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From heej...@apache.org
Subject [beam] branch master updated: [BEAM-11592] Adding cross-language test using third-party Python dependencies
Date Wed, 10 Mar 2021 22:30:24 GMT
This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b08a484  [BEAM-11592] Adding cross-language test using third-party Python dependencies
     new 42826cd  Merge pull request #13729 from ihji/BEAM-11592
b08a484 is described below

commit b08a4842cc666030ad4c963321ec838f5d54d18b
Author: Heejong Lee <heejong@gmail.com>
AuthorDate: Fri Jan 8 20:15:54 2021 -0800

    [BEAM-11592] Adding cross-language test using third-party Python dependencies
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy | 29 ++++++++++++++++++---
 .../core/construction/ValidateRunnerXlangTest.java | 24 +++++++++++++++++
 runners/direct-java/build.gradle                   |  2 ++
 runners/flink/job-server/flink_job_server.gradle   |  1 +
 runners/google-cloud-dataflow-java/build.gradle    |  1 +
 runners/spark/build.gradle                         |  2 ++
 .../sdk/testing/UsesPythonExpansionService.java    | 27 +++++++++++++++++++
 .../runners/portability/artifact_service.py        |  3 ++-
 .../runners/portability/expansion_service_test.py  | 30 ++++++++++++++++++++++
 9 files changed, 115 insertions(+), 4 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 773fbbc..a78836c 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -2060,6 +2060,30 @@ class BeamModulePlugin implements Plugin<Project> {
         cleanupTask.mustRunAfter pythonTask
         config.cleanupJobServer.mustRunAfter pythonTask
       }
+
+      // Task for running Python-only testcases in Java SDK
+      def javaUsingPythonOnlyTask = project.tasks.create(name: config.name+"JavaUsingPythonOnly",
type: Test) {
+        group = "Verification"
+        description = "Validates runner for cross-language capability of using Python-only
transforms from Java SDK"
+        systemProperty "beamTestPipelineOptions", JsonOutput.toJson(config.javaPipelineOptions)
+        systemProperty "expansionJar", expansionJar
+        systemProperty "expansionPort", pythonPort
+        classpath = config.classpath
+        testClassesDirs = project.files(project.project(":runners:core-construction-java").sourceSets.test.output.classesDirs)
+        maxParallelForks config.numParallelTests
+        useJUnit {
+          includeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
+        }
+        // increase maxHeapSize as this is directly correlated to direct memory,
+        // see https://issues.apache.org/jira/browse/BEAM-6698
+        maxHeapSize = '4g'
+        dependsOn setupTask
+        dependsOn config.startJobServer
+      }
+      mainTask.dependsOn javaUsingPythonOnlyTask
+      cleanupTask.mustRunAfter javaUsingPythonOnlyTask
+      config.cleanupJobServer.mustRunAfter javaUsingPythonOnlyTask
+
       // Task for running testcases in Python SDK
       def testOpts = [
         "--attr=UsesSqlExpansionService"
@@ -2075,13 +2099,12 @@ class BeamModulePlugin implements Plugin<Project> {
         description = "Validates runner for cross-language capability of using Java's SqlTransform
from Python SDK"
         executable 'sh'
         args '-c', ". $envDir/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh
$cmdArgs"
+        dependsOn setupTask
         dependsOn config.startJobServer
-        dependsOn ':sdks:java:container:java8:docker'
-        dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker'
         dependsOn ':sdks:java:extensions:sql:expansion-service:shadowJar'
-        dependsOn ":sdks:python:installGcpTest"
       }
       mainTask.dependsOn pythonSqlTask
+      cleanupTask.mustRunAfter pythonSqlTask
       config.cleanupJobServer.mustRunAfter pythonSqlTask
     }
 
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
index 66e0133..40a0946 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesCrossLanguageTransforms;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -100,6 +101,7 @@ public class ValidateRunnerXlangTest implements Serializable {
   private static final String TEST_COMPK_URN = "beam:transforms:xlang:test:compk";
   private static final String TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten";
   private static final String TEST_PARTITION_URN = "beam:transforms:xlang:test:partition";
+  private static final String TEST_PYTHON_BS4_URN = "beam:transforms:xlang:test:python_bs4";
 
   private static String expansionAddr;
   private static String expansionJar;
@@ -317,6 +319,28 @@ public class ValidateRunnerXlangTest implements Serializable {
     PAssert.that(col.get("1")).containsInAnyOrder(1L, 3L, 5L);
   }
 
+  @Test
+  @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
+  public void pythonDependenciesTest() {
+    String html =
+        "<html><head><title>The Dormouse's story</title></head>\n"
+            + "<body>\n"
+            + "<p class=\"title\"><b>The Dormouse's story</b></p>\n"
+            + "\n"
+            + "<p class=\"story\">Once upon a time there were three little sisters;
and their names were\n"
+            + "<a href=\"http://example.com/elsie\" class=\"sister\" id=\"link1\">Elsie</a>,\n"
+            + "<a href=\"http://example.com/lacie\" class=\"sister\" id=\"link2\">Lacie</a>
and\n"
+            + "<a href=\"http://example.com/tillie\" class=\"sister\" id=\"link3\">Tillie</a>;\n"
+            + "and they lived at the bottom of a well.</p>\n"
+            + "\n"
+            + "<p class=\"story\">...</p>";
+    PCollection<String> col =
+        testPipeline
+            .apply(Create.of(html))
+            .apply(External.of(TEST_PYTHON_BS4_URN, new byte[] {}, expansionAddr));
+    PAssert.that(col).containsInAnyOrder("The Dormouse's story");
+  }
+
   private byte[] toStringPayloadBytes(String data) throws IOException {
     Row configRow =
         Row.withSchema(Schema.of(Field.of("data", FieldType.STRING)))
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index eada181..89564cd 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -130,6 +130,7 @@ task needsRunnerTests(type: Test) {
     // MetricsPusher isn't implemented in direct runner
     excludeCategories "org.apache.beam.sdk.testing.UsesMetricsPusher"
     excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
+    excludeCategories "org.apache.beam.sdk.testing.UsesPythonExpansionService"
     excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
   }
   testLogging {
@@ -159,6 +160,7 @@ task validatesRunner(type: Test) {
     excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB"
     excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
     excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
+    excludeCategories "org.apache.beam.sdk.testing.UsesPythonExpansionService"
   }
 }
 
diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle
index 3c83ce9..c12f84f 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -157,6 +157,7 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean
checkpoi
         excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB'
         excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
         excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
         excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
         excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
         excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index d8fe8ff..9cc104c 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -152,6 +152,7 @@ def commonLegacyExcludeCategories = [
   'org.apache.beam.sdk.testing.LargeKeys$Above10MB',
   'org.apache.beam.sdk.testing.UsesAttemptedMetrics',
   'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms',
+  'org.apache.beam.sdk.testing.UsesPythonExpansionService',
   'org.apache.beam.sdk.testing.UsesDistributionMetrics',
   'org.apache.beam.sdk.testing.UsesGaugeMetrics',
   'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs',
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 0ed83db..d2f171f 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -185,6 +185,7 @@ task validatesRunnerBatch(type: Test) {
     excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
     // Portability
     excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
     excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
   }
   jvmArgs '-Xmx3g'
@@ -255,6 +256,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
     excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
     // Portability
     excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
     excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
   }
   filter {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java
new file mode 100644
index 0000000..b92742e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+/**
+ * Category tag for tests which use the expansion service in Python. Tests tagged with {@link
+ * UsesPythonExpansionService} should be run for runners which support cross-language transforms.
+ */
+@Internal
+public interface UsesPythonExpansionService {}
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index 18537f4..d0c8f8e8 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -271,7 +271,8 @@ class BeamFilesystemHandler(object):
 
   def file_writer(self, name=None):
     full_path = filesystems.FileSystems.join(self._root, name)
-    return filesystems.FileSystems.create(full_path), full_path
+    return filesystems.FileSystems.create(
+        full_path, compression_type=CompressionTypes.UNCOMPRESSED), full_path
 
 
 def resolve_artifacts(artifacts, service, dest_dir):
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_test.py b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
index bd8f72e..11ade9e 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
@@ -31,10 +31,13 @@ import apache_beam as beam
 import apache_beam.transforms.combiners as combine
 from apache_beam.coders import RowCoder
 from apache_beam.pipeline import PipelineOptions
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 from apache_beam.portability.api import beam_expansion_api_pb2_grpc
 from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
+from apache_beam.runners.portability import artifact_service
 from apache_beam.runners.portability import expansion_service
 from apache_beam.transforms import ptransform
+from apache_beam.transforms.environments import PyPIArtifactRegistry
 from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
 from apache_beam.utils import thread_pool_executor
 
@@ -51,6 +54,7 @@ TEST_COMGL_URN = "beam:transforms:xlang:test:comgl"
 TEST_COMPK_URN = "beam:transforms:xlang:test:compk"
 TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten"
 TEST_PARTITION_URN = "beam:transforms:xlang:test:partition"
+TEST_PYTHON_BS4_URN = "beam:transforms:xlang:test:python_bs4"
 
 
 @ptransform.PTransform.register_urn('beam:transforms:xlang:count', None)
@@ -226,6 +230,27 @@ class PartitionTransform(ptransform.PTransform):
     return PartitionTransform()
 
 
+class ExtractHtmlTitleDoFn(beam.DoFn):
+  def process(self, element):
+    from bs4 import BeautifulSoup
+    soup = BeautifulSoup(element, 'html.parser')
+    return [soup.title.string]
+
+
+@ptransform.PTransform.register_urn(TEST_PYTHON_BS4_URN, None)
+class ExtractHtmlTitleTransform(ptransform.PTransform):
+  def expand(self, pcoll):
+    return pcoll | beam.ParDo(ExtractHtmlTitleDoFn()).with_output_types(unicode)
+
+  def to_runner_api_parameter(self, unused_context):
+    return TEST_PYTHON_BS4_URN, None
+
+  @staticmethod
+  def from_runner_api_parameter(
+      unused_ptransform, unused_parameter, unused_context):
+    return ExtractHtmlTitleTransform()
+
+
 @ptransform.PTransform.register_urn('payload', bytes)
 class PayloadTransform(ptransform.PTransform):
   def __init__(self, payload):
@@ -287,6 +312,7 @@ def cleanup(unused_signum, unused_frame):
 
 
 def main(unused_argv):
+  PyPIArtifactRegistry.register_artifact('beautifulsoup4', '>=4.9,<5.0')
   parser = argparse.ArgumentParser()
   parser.add_argument(
       '-p', '--port', type=int, help='port on which to serve the job api')
@@ -298,6 +324,10 @@ def main(unused_argv):
           PipelineOptions(
               ["--experiments", "beam_fn_api", "--sdk_location", "container"])),
       server)
+  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
+      artifact_service.ArtifactRetrievalService(
+          artifact_service.BeamFilesystemHandler(None).file_reader),
+      server)
   server.add_insecure_port('localhost:{}'.format(options.port))
   server.start()
   _LOGGER.info('Listening for expansion requests at %d', options.port)


Mime
View raw message