beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
Date Fri, 24 Aug 2018 18:24:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137920&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137920
]

ASF GitHub Bot logged work on BEAM-4130:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Aug/18 18:23
            Start Date: 24/Aug/18 18:23
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6265: [BEAM-4130] Bring up Job Server
container for Python jobs
URL: https://github.com/apache/beam/pull/6265
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/job-server-container/Dockerfile b/runners/flink/job-server-container/Dockerfile
index a9aff21b6d6..569c2ab0406 100644
--- a/runners/flink/job-server-container/Dockerfile
+++ b/runners/flink/job-server-container/Dockerfile
@@ -19,7 +19,7 @@
 FROM openjdk:8
 MAINTAINER "Apache Beam <dev@beam.apache.org>"
 
-ADD target/beam-runners-flink_2.11-job-server.jar /opt/apache/beam/jars/
+ADD target/beam-runners-flink-job-server.jar /opt/apache/beam/jars/
 ADD target/flink-job-server.sh /opt/apache/beam/
 
 WORKDIR /opt/apache/beam
diff --git a/runners/flink/job-server-container/build.gradle b/runners/flink/job-server-container/build.gradle
index 4d5f53316e3..d2b026804d7 100644
--- a/runners/flink/job-server-container/build.gradle
+++ b/runners/flink/job-server-container/build.gradle
@@ -38,7 +38,7 @@ dependencies {
 task copyDockerfileDependencies(type: Copy) {
   // Required Jars
   from configurations.dockerDependency
-  rename 'beam-runners-flink_2.11-job-server.*.jar', 'beam-runners-flink_2.11-job-server.jar'
+  rename 'beam-runners-flink_2.11-job-server.*.jar', 'beam-runners-flink-job-server.jar'
   into "build/target"
   // Entry script
   from file("./flink-job-server.sh")
diff --git a/runners/flink/job-server/build.gradle b/runners/flink/job-server/build.gradle
index 5e1e01ae74d..ee6b70f1f37 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -55,7 +55,7 @@ dependencies {
 // task will not work because the flink runner classes only exist in the shadow
 // jar.
 runShadow {
-  def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : "localhost:8099"
+  def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : "localhost"
   def artifactsDir = project.hasProperty("artifactsDir") ?  project.property("artifactsDir")
: "/tmp/flink-artifacts"
   def cleanArtifactsPerJob = project.hasProperty("cleanArtifactsPerJob")
   args = ["--job-host=${jobHost}", "--artifacts-dir=${artifactsDir}"]
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 7e9b14a2acd..13f48c53018 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -45,15 +45,22 @@
 
   private final ListeningExecutorService executor;
   private final ServerConfiguration configuration;
-  private final ServerFactory serverFactory;
+  private final ServerFactory jobServerFactory;
+  private final ServerFactory artifactServerFactory;
   private GrpcFnServer<InMemoryJobService> jobServer;
   private GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingServer;
 
   /** Configuration for the jobServer. */
   public static class ServerConfiguration {
-    @Option(name = "--job-host", usage = "The job server host string")
+    @Option(name = "--job-host", usage = "The job server host name")
     private String host = "";
 
+    @Option(name = "--job-port", usage = "The job service port. (Default: 8099)")
+    private int port = 8099;
+
+    @Option(name = "--artifact-port", usage = "The artifact service port. (Default: 8098)")
+    private int artifactPort = 8098;
+
     @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
     private String artifactStagingPath = "/tmp/beam-artifact-staging";
 
@@ -100,24 +107,30 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration)
         new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
     ListeningExecutorService executor =
         MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-    ServerFactory serverFactory = ServerFactory.createDefault();
-    return create(configuration, executor, serverFactory);
+    ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port);
+    ServerFactory artifactServerFactory =
+        ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
+    return create(configuration, executor, jobServerFactory, artifactServerFactory);
   }
 
   public static FlinkJobServerDriver create(
       ServerConfiguration configuration,
       ListeningExecutorService executor,
-      ServerFactory serverFactory) {
-    return new FlinkJobServerDriver(configuration, executor, serverFactory);
+      ServerFactory jobServerFactory,
+      ServerFactory artifactServerFactory) {
+    return new FlinkJobServerDriver(
+        configuration, executor, jobServerFactory, artifactServerFactory);
   }
 
   private FlinkJobServerDriver(
       ServerConfiguration configuration,
       ListeningExecutorService executor,
-      ServerFactory serverFactory) {
+      ServerFactory jobServerFactory,
+      ServerFactory artifactServerFactory) {
     this.configuration = configuration;
     this.executor = executor;
-    this.serverFactory = serverFactory;
+    this.jobServerFactory = jobServerFactory;
+    this.artifactServerFactory = jobServerFactory;
   }
 
   @Override
@@ -166,11 +179,13 @@ public void stop() {
     InMemoryJobService service = createJobService();
     GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer;
     if (Strings.isNullOrEmpty(configuration.host)) {
-      jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+      jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, jobServerFactory);
     } else {
       Endpoints.ApiServiceDescriptor descriptor =
-          Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
-      jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, serverFactory);
+          Endpoints.ApiServiceDescriptor.newBuilder()
+              .setUrl(configuration.host + ":" + configuration.port)
+              .build();
+      jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, jobServerFactory);
     }
     LOG.info("JobServer started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
     return jobServiceGrpcFnServer;
@@ -200,8 +215,17 @@ private InMemoryJobService createJobService() throws IOException {
   private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService()
       throws IOException {
     BeamFileSystemArtifactStagingService service = new BeamFileSystemArtifactStagingService();
-    GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
-        GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+    final GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService;
+    if (Strings.isNullOrEmpty(configuration.host)) {
+      artifactStagingService =
+          GrpcFnServer.allocatePortAndCreateFor(service, artifactServerFactory);
+    } else {
+      Endpoints.ApiServiceDescriptor descriptor =
+          Endpoints.ApiServiceDescriptor.newBuilder()
+              .setUrl(configuration.host + ":" + configuration.artifactPort)
+              .build();
+      artifactStagingService = GrpcFnServer.create(service, descriptor, artifactServerFactory);
+    }
     LOG.info(
         "ArtifactStagingService started on {}",
         artifactStagingService.getApiServiceDescriptor().getUrl());
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index bcfb089a810..6ef288334ed 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -25,6 +25,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
 import org.apache.beam.vendor.grpc.v1.io.grpc.BindableService;
@@ -44,6 +45,17 @@ public static ServerFactory createWithUrlFactory(UrlFactory urlFactory)
{
     return new InetSocketAddressServerFactory(urlFactory);
   }
 
+  /** Create a {@link ServerFactory} that uses ports from a supplier. */
+  public static ServerFactory createWithPortSupplier(Supplier<Integer> portSupplier)
{
+    return new InetSocketAddressServerFactory(UrlFactory.createDefault(), portSupplier);
+  }
+
+  /** Create a {@link ServerFactory} that uses the given url factory and ports from a supplier.
*/
+  public static ServerFactory createWithUrlFactoryAndPortSupplier(
+      UrlFactory urlFactory, Supplier<Integer> portSupplier) {
+    return new InetSocketAddressServerFactory(urlFactory, portSupplier);
+  }
+
   /**
    * Creates an instance of this server using an ephemeral port chosen automatically. The
chosen
    * port is accessible to the caller from the URL set in the input {@link
@@ -68,16 +80,23 @@ public abstract Server create(
    */
   public static class InetSocketAddressServerFactory extends ServerFactory {
     private final UrlFactory urlFactory;
+    private final Supplier<Integer> portSupplier;
 
     private InetSocketAddressServerFactory(UrlFactory urlFactory) {
+      this(urlFactory, () -> 0);
+    }
+
+    private InetSocketAddressServerFactory(UrlFactory urlFactory, Supplier<Integer>
portSupplier) {
       this.urlFactory = urlFactory;
+      this.portSupplier = portSupplier;
     }
 
     @Override
     public Server allocatePortAndCreate(
         BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
         throws IOException {
-      InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(),
0);
+      InetSocketAddress address =
+          new InetSocketAddress(InetAddress.getLoopbackAddress(), portSupplier.get());
       Server server = createServer(service, address);
       apiServiceDescriptor.setUrl(urlFactory.createUrl(address.getHostName(), server.getPort()));
       return server;
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index fa6f188bd3e..1e7f48be180 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -29,6 +29,7 @@
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
@@ -69,6 +70,11 @@
 public class DockerJobBundleFactory implements JobBundleFactory {
   private static final Logger LOG = LoggerFactory.getLogger(DockerJobBundleFactory.class);
 
+  // Port offset for MacOS since we don't have host networking and need to use published
ports
+  private static final int MAC_PORT_START = 8100;
+  private static final int MAC_PORT_END = 8200;
+  private static final AtomicInteger MAC_PORT = new AtomicInteger(MAC_PORT_START);
+
   /** Factory that creates {@link JobBundleFactory} for the given {@link JobInfo}. */
   public interface JobBundleFactoryFactory {
     DockerJobBundleFactory create(JobInfo jobInfo) throws Exception;
@@ -218,8 +224,15 @@ protected ServerFactory getServerFactory() {
         // https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds
         // The special hostname has historically changed between versions, so this is subject
to
         // breakages and will likely only support the latest version at any time.
-        return ServerFactory.createWithUrlFactory(
-            (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString());
+
+        // We need to use a fixed port range due to non-existing host networking in Docker-for-Mac.
+        // The port range needs to be published when bringing up the Docker container, see
+        // DockerEnvironmentFactory.
+
+        return ServerFactory.createWithUrlFactoryAndPortSupplier(
+            (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString(),
+            // We only use the published Docker ports 8100-8200 in a round-robin fashion
+            () -> MAC_PORT.getAndUpdate(val -> val == MAC_PORT_END ? MAC_PORT_START
: val + 1));
       default:
         LOG.warn("Unknown Docker platform. Falling back to default server factory");
         return ServerFactory.createDefault();
@@ -229,7 +242,10 @@ protected ServerFactory getServerFactory() {
   private static Platform getPlatform() {
     String osName = System.getProperty("os.name").toLowerCase();
     // TODO: Make this more robust?
-    if (osName.startsWith("mac")) {
+    // The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run
on
+    // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable
from Linux.
+    // We still need to apply port mapping due to missing host networking.
+    if (osName.startsWith("mac") || "1".equals(System.getenv("DOCKER_MAC_CONTAINER"))) {
       return Platform.MAC;
     } else if (osName.startsWith("linux")) {
       return Platform.LINUX;
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
index 1012b006bd6..c6a0b62e4ba 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
@@ -129,6 +129,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws
Excep
             .addAll(gcsCredentialArgs())
             // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
             .add("--network=host")
+            // We need to pass on the information about Docker-on-Mac environment (due to
missing host networking on Mac)
+            .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"))
             .build();
 
     List<String> args =
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 51f01c9a66d..5aba3f16076 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -53,7 +53,8 @@ def _subprocess_command(cls, port):
             'java',
             '-jar', flinkJobServerJar,
             '--artifacts-dir', tmp_dir,
-            '--job-host', 'localhost:%s' % port,
+            '--job-host', 'localhost',
+            '--job-port', str(port),
         ]
       finally:
         shutil.rmtree(tmp_dir)
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py
new file mode 100644
index 00000000000..25b8666121a
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import atexit
+import logging
+import os
+import signal
+import sys
+import time
+from subprocess import Popen
+from threading import Lock
+
+
+class DockerizedJobServer(object):
+  """
+   Spins up the JobServer in a docker container for local execution
+  """
+
+  def __init__(self, job_host="localhost",
+               job_port=8099,
+               artifact_port=8098,
+               harness_port_range=(8100, 8200),
+               max_connection_retries=5):
+    self.job_host = job_host
+    self.job_port = job_port
+    self.artifact_port = artifact_port
+    self.harness_port_range = harness_port_range
+    self.max_connection_retries = max_connection_retries
+    self.docker_process = None
+    self.process_lock = Lock()
+
+  def start(self):
+    # TODO This is hardcoded to Flink at the moment but should be changed
+    job_server_image_name = os.environ['USER'] + \
+        "-docker-apache.bintray.io/beam/flink-job-server:latest"
+    cmd = ["docker", "run",
+           # We mount the docker binary and socket to be able to spin up
+           # "sibling" containers for the SDK harness.
+           "-v", "/usr/local/bin/docker:/bin/docker",
+           "-v", "/var/run/docker.sock:/var/run/docker.sock"]
+    args = ["--job-host", self.job_host, "--job-port", str(self.job_port)]
+
+    if sys.platform == "darwin":
+      # Docker-for-Mac doesn't support host networking, so we need to explictly
+      # publish ports from the Docker container to be able to connect to it.
+      # Also, all other containers need to be aware that they run Docker-on-Mac
+      # to connect against the internal Docker-for-Mac address.
+      cmd += ["-e", "DOCKER_MAC_CONTAINER=1"]
+      cmd += ["-p", "{}:{}".format(self.job_port, self.job_port)]
+      cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)]
+      cmd += ["-p", "{0}-{1}:{0}-{1}".format(
+          self.harness_port_range[0], self.harness_port_range[1])]
+      args += ["--artifact-port", "{}".format(self.artifact_port)]
+    else:
+      # This shouldn't be set for MacOS because it detroys port forwardings,
+      # even though host networking is not supported on MacOS.
+      cmd.append("--network=host")
+
+    cmd.append(job_server_image_name)
+    cmd += args
+
+    logging.debug("Starting container with %s", cmd)
+    try:
+      self.docker_process = Popen(cmd)
+      atexit.register(self.stop)
+      signal.signal(signal.SIGINT, self.stop)
+    except:  # pylint:disable=bare-except
+      logging.exception("Error bringing up container")
+      self.stop()
+
+    return "{}:{}".format(self.job_host, self.job_port)
+
+  def stop(self):
+    with self.process_lock:
+      if not self.docker_process:
+        return
+      num_retries = 0
+      while self.docker_process.poll() is None and \
+              num_retries < self.max_connection_retries:
+        logging.debug("Sending SIGINT to job_server container")
+        self.docker_process.send_signal(signal.SIGINT)
+        num_retries += 1
+        time.sleep(1)
+      if self.docker_process.poll is None:
+        self.docker_process.kill()
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index baeaf241803..1abcea2f059 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -34,6 +34,7 @@
 from apache_beam.runners import runner
 from apache_beam.runners.job import utils as job_utils
 from apache_beam.runners.portability import portable_stager
+from apache_beam.runners.portability.job_server import DockerizedJobServer
 
 __all__ = ['PortableRunner']
 
@@ -74,8 +75,8 @@ def run_pipeline(self, pipeline):
         or self.default_docker_image())
     job_endpoint = pipeline.options.view_as(PortableOptions).job_endpoint
     if not job_endpoint:
-      raise ValueError(
-          'job_endpoint should be provided while creating runner.')
+      docker = DockerizedJobServer()
+      job_endpoint = docker.start()
 
     proto_context = pipeline_context.PipelineContext(
         default_environment_url=docker_image)
@@ -108,12 +109,28 @@ def run_pipeline(self, pipeline):
                for k, v in pipeline._options.get_all_options().iteritems()
                if v is not None}
 
-    job_service = beam_job_api_pb2_grpc.JobServiceStub(
-        grpc.insecure_channel(job_endpoint))
-    prepare_response = job_service.Prepare(
-        beam_job_api_pb2.PrepareJobRequest(
-            job_name='job', pipeline=proto_pipeline,
-            pipeline_options=job_utils.dict_to_struct(options)))
+    channel = grpc.insecure_channel(job_endpoint)
+    grpc.channel_ready_future(channel).result()
+    job_service = beam_job_api_pb2_grpc.JobServiceStub(channel)
+
+    # Sends the PrepareRequest but retries in case the channel is not ready
+    def send_prepare_request(max_retries=5):
+      num_retries = 0
+      while True:
+        try:
+          # This reports channel is READY but connections may fail
+          # Seems to be only an issue on Mac with port forwardings
+          grpc.channel_ready_future(channel).result()
+          return job_service.Prepare(
+              beam_job_api_pb2.PrepareJobRequest(
+                  job_name='job', pipeline=proto_pipeline,
+                  pipeline_options=job_utils.dict_to_struct(options)))
+        except grpc._channel._Rendezvous as e:
+          num_retries += 1
+          if num_retries > max_retries:
+            raise e
+
+    prepare_response = send_prepare_request()
     if prepare_response.artifact_staging_endpoint.url:
       stager = portable_stager.PortableStager(
           grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index df9a97fd54f..0423cc01b90 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //    ./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//    ./gradlew :beam-runners-flink_2.11-job-server-container:docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink runner
 //    (in a separate shell since it continues to run):
 //
 //    ./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a):
+//
 //    ./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//    ./gradlew :beam-sdks-python:portableWordCount -PjobEndpoint=localhost:8099
+//
 task portableWordCount(dependsOn: 'installGcpTest') {
   doLast {
     // TODO: Figure out GCS credentials and use real GCS input and output.
@@ -225,11 +236,12 @@ task portableWordCount(dependsOn: 'installGcpTest') {
             "--output=/tmp/py-wordcount-direct",
             "--experiments=beam_fn_api",
             "--runner=PortableRunner",
-            "--job_endpoint=localhost:8099",
             "--sdk_location=container",
     ]
     if (project.hasProperty("streaming"))
       options += ["--streaming"]
+    if (project.hasProperty("jobEndpoint"))
+      options += ["--job_endpoint=${project.property('jobEndpoint')}"]
     exec {
       executable 'sh'
       args '-c', ". ${envdir}/bin/activate && python -m apache_beam.examples.wordcount
${options.join(' ')}"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 137920)
    Time Spent: 7h 50m  (was: 7h 40m)

> Portable Flink runner JobService entry point in a Docker container
> ------------------------------------------------------------------
>
>                 Key: BEAM-4130
>                 URL: https://issues.apache.org/jira/browse/BEAM-4130
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Maximilian Michels
>            Priority: Minor
>          Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We need a main
entry point that itself spins up the job service (and artifact staging service). The main
program itself should be packaged into an uberjar such that it can be run locally or submitted
to a Flink deployment via `flink run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message