From commits-return-113252-archive-asf-public=cust-asf.ponee.io@beam.apache.org Wed Mar 10 22:30:30 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id DEBBF180654 for ; Wed, 10 Mar 2021 23:30:29 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id C69406342E for ; Wed, 10 Mar 2021 22:30:28 +0000 (UTC) Received: (qmail 93862 invoked by uid 500); 10 Mar 2021 22:30:28 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 93807 invoked by uid 99); 10 Mar 2021 22:30:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Mar 2021 22:30:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 02D2B80547; Wed, 10 Mar 2021 22:30:28 +0000 (UTC) Date: Wed, 10 Mar 2021 22:30:24 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: [BEAM-11592] Adding cross-language test using third-party Python dependencies MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <161541542099.29224.3509532026767965773@gitbox.apache.org> From: heejong@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 63c13d65ab9f9f904dbdf3f4a0552019c4f0ae7d X-Git-Newrev: b08a4842cc666030ad4c963321ec838f5d54d18b X-Git-Rev: b08a4842cc666030ad4c963321ec838f5d54d18b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. heejong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b08a484 [BEAM-11592] Adding cross-language test using third-party Python dependencies new 42826cd Merge pull request #13729 from ihji/BEAM-11592 b08a484 is described below commit b08a4842cc666030ad4c963321ec838f5d54d18b Author: Heejong Lee AuthorDate: Fri Jan 8 20:15:54 2021 -0800 [BEAM-11592] Adding cross-language test using third-party Python dependencies --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 29 ++++++++++++++++++--- .../core/construction/ValidateRunnerXlangTest.java | 24 +++++++++++++++++ runners/direct-java/build.gradle | 2 ++ runners/flink/job-server/flink_job_server.gradle | 1 + runners/google-cloud-dataflow-java/build.gradle | 1 + runners/spark/build.gradle | 2 ++ .../sdk/testing/UsesPythonExpansionService.java | 27 +++++++++++++++++++ .../runners/portability/artifact_service.py | 3 ++- .../runners/portability/expansion_service_test.py | 30 ++++++++++++++++++++++ 9 files changed, 115 insertions(+), 4 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 773fbbc..a78836c 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2060,6 +2060,30 @@ class BeamModulePlugin implements Plugin { cleanupTask.mustRunAfter pythonTask config.cleanupJobServer.mustRunAfter pythonTask } + + // Task for running Python-only testcases in Java SDK + def javaUsingPythonOnlyTask = project.tasks.create(name: config.name+"JavaUsingPythonOnly", type: Test) { + group = "Verification" + description = "Validates runner for cross-language capability of using Python-only transforms from Java SDK" + systemProperty "beamTestPipelineOptions", JsonOutput.toJson(config.javaPipelineOptions) + systemProperty "expansionJar", expansionJar + systemProperty "expansionPort", pythonPort + classpath = config.classpath + testClassesDirs = project.files(project.project(":runners:core-construction-java").sourceSets.test.output.classesDirs) + maxParallelForks config.numParallelTests + useJUnit { + includeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' + } + // increase maxHeapSize as this is directly correlated to direct memory, + // see https://issues.apache.org/jira/browse/BEAM-6698 + maxHeapSize = '4g' + dependsOn setupTask + dependsOn config.startJobServer + } + mainTask.dependsOn javaUsingPythonOnlyTask + cleanupTask.mustRunAfter javaUsingPythonOnlyTask + config.cleanupJobServer.mustRunAfter javaUsingPythonOnlyTask + // Task for running testcases in Python SDK def testOpts = [ "--attr=UsesSqlExpansionService" @@ -2075,13 +2099,12 @@ class BeamModulePlugin implements Plugin { description = "Validates runner for cross-language capability of using Java's SqlTransform from Python SDK" executable 'sh' args '-c', ". $envDir/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs" + dependsOn setupTask dependsOn config.startJobServer - dependsOn ':sdks:java:container:java8:docker' - dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker' dependsOn ':sdks:java:extensions:sql:expansion-service:shadowJar' - dependsOn ":sdks:python:installGcpTest" } mainTask.dependsOn pythonSqlTask + cleanupTask.mustRunAfter pythonSqlTask config.cleanupJobServer.mustRunAfter pythonSqlTask } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java index 66e0133..40a0946 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.schemas.SchemaTranslation; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesCrossLanguageTransforms; +import org.apache.beam.sdk.testing.UsesPythonExpansionService; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; @@ -100,6 +101,7 @@ public class ValidateRunnerXlangTest implements Serializable { private static final String TEST_COMPK_URN = "beam:transforms:xlang:test:compk"; private static final String TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten"; private static final String TEST_PARTITION_URN = "beam:transforms:xlang:test:partition"; + private static final String TEST_PYTHON_BS4_URN = "beam:transforms:xlang:test:python_bs4"; private static String expansionAddr; private static String expansionJar; @@ -317,6 +319,28 @@ public class ValidateRunnerXlangTest implements Serializable { PAssert.that(col.get("1")).containsInAnyOrder(1L, 3L, 5L); } + @Test + @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) + public void pythonDependenciesTest() { + String html = + "The Dormouse's story\n" + + "\n" + + "

The Dormouse's story

\n" + + "\n" + + "

Once upon a time there were three little sisters; and their names were\n" + + "Elsie,\n" + + "Lacie and\n" + + "Tillie;\n" + + "and they lived at the bottom of a well.

\n" + + "\n" + + "

...

"; + PCollection col = + testPipeline + .apply(Create.of(html)) + .apply(External.of(TEST_PYTHON_BS4_URN, new byte[] {}, expansionAddr)); + PAssert.that(col).containsInAnyOrder("The Dormouse's story"); + } + private byte[] toStringPayloadBytes(String data) throws IOException { Row configRow = Row.withSchema(Schema.of(Field.of("data", FieldType.STRING))) diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle index eada181..89564cd 100644 --- a/runners/direct-java/build.gradle +++ b/runners/direct-java/build.gradle @@ -130,6 +130,7 @@ task needsRunnerTests(type: Test) { // MetricsPusher isn't implemented in direct runner excludeCategories "org.apache.beam.sdk.testing.UsesMetricsPusher" excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms" + excludeCategories "org.apache.beam.sdk.testing.UsesPythonExpansionService" excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' } testLogging { @@ -159,6 +160,7 @@ task validatesRunner(type: Test) { excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB" excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher' excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms" + excludeCategories "org.apache.beam.sdk.testing.UsesPythonExpansionService" } } diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 3c83ce9..c12f84f 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -157,6 +157,7 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB' excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' + excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index d8fe8ff..9cc104c 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -152,6 +152,7 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.LargeKeys$Above10MB', 'org.apache.beam.sdk.testing.UsesAttemptedMetrics', 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms', + 'org.apache.beam.sdk.testing.UsesPythonExpansionService', 'org.apache.beam.sdk.testing.UsesDistributionMetrics', 'org.apache.beam.sdk.testing.UsesGaugeMetrics', 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs', diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index 0ed83db..d2f171f 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -185,6 +185,7 @@ task validatesRunnerBatch(type: Test) { excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' // Portability excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' + excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' } jvmArgs '-Xmx3g' @@ -255,6 +256,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) { excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' // Portability excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' + excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService' excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' } filter { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java new file mode 100644 index 0000000..b92742e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Category tag for tests which use the expansion service in Python. Tests tagged with {@link + * UsesPythonExpansionService} should be run for runners which support cross-language transforms. + */ +@Internal +public interface UsesPythonExpansionService {} diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py index 18537f4..d0c8f8e8 100644 --- a/sdks/python/apache_beam/runners/portability/artifact_service.py +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py @@ -271,7 +271,8 @@ class BeamFilesystemHandler(object): def file_writer(self, name=None): full_path = filesystems.FileSystems.join(self._root, name) - return filesystems.FileSystems.create(full_path), full_path + return filesystems.FileSystems.create( + full_path, compression_type=CompressionTypes.UNCOMPRESSED), full_path def resolve_artifacts(artifacts, service, dest_dir): diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_test.py b/sdks/python/apache_beam/runners/portability/expansion_service_test.py index bd8f72e..11ade9e 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_test.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_test.py @@ -31,10 +31,13 @@ import apache_beam as beam import apache_beam.transforms.combiners as combine from apache_beam.coders import RowCoder from apache_beam.pipeline import PipelineOptions +from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_expansion_api_pb2_grpc from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload +from apache_beam.runners.portability import artifact_service from apache_beam.runners.portability import expansion_service from apache_beam.transforms import ptransform +from apache_beam.transforms.environments import PyPIArtifactRegistry from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder from apache_beam.utils import thread_pool_executor @@ -51,6 +54,7 @@ TEST_COMGL_URN = "beam:transforms:xlang:test:comgl" TEST_COMPK_URN = "beam:transforms:xlang:test:compk" TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten" TEST_PARTITION_URN = "beam:transforms:xlang:test:partition" +TEST_PYTHON_BS4_URN = "beam:transforms:xlang:test:python_bs4" @ptransform.PTransform.register_urn('beam:transforms:xlang:count', None) @@ -226,6 +230,27 @@ class PartitionTransform(ptransform.PTransform): return PartitionTransform() +class ExtractHtmlTitleDoFn(beam.DoFn): + def process(self, element): + from bs4 import BeautifulSoup + soup = BeautifulSoup(element, 'html.parser') + return [soup.title.string] + + +@ptransform.PTransform.register_urn(TEST_PYTHON_BS4_URN, None) +class ExtractHtmlTitleTransform(ptransform.PTransform): + def expand(self, pcoll): + return pcoll | beam.ParDo(ExtractHtmlTitleDoFn()).with_output_types(unicode) + + def to_runner_api_parameter(self, unused_context): + return TEST_PYTHON_BS4_URN, None + + @staticmethod + def from_runner_api_parameter( + unused_ptransform, unused_parameter, unused_context): + return ExtractHtmlTitleTransform() + + @ptransform.PTransform.register_urn('payload', bytes) class PayloadTransform(ptransform.PTransform): def __init__(self, payload): @@ -287,6 +312,7 @@ def cleanup(unused_signum, unused_frame): def main(unused_argv): + PyPIArtifactRegistry.register_artifact('beautifulsoup4', '>=4.9,<5.0') parser = argparse.ArgumentParser() parser.add_argument( '-p', '--port', type=int, help='port on which to serve the job api') @@ -298,6 +324,10 @@ def main(unused_argv): PipelineOptions( ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server) + beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( + artifact_service.ArtifactRetrievalService( + artifact_service.BeamFilesystemHandler(None).file_reader), + server) server.add_insecure_port('localhost:{}'.format(options.port)) server.start() _LOGGER.info('Listening for expansion requests at %d', options.port)