beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
Date Thu, 14 Jun 2018 22:12:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112079&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112079
]

ASF GitHub Bot logged work on BEAM-4130:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/18 22:11
            Start Date: 14/Jun/18 22:11
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #5493: [BEAM-4130] Add job submission
capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index ebd607c0e21..4eec38962c6 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -59,6 +59,7 @@ dependencies {
   shadow library.java.slf4j_api
   shadow library.java.joda_time
   shadow library.java.commons_compress
+  shadow library.java.args4j
   shadow "org.apache.flink:flink-clients_2.11:$flink_version"
   shadow "org.apache.flink:flink-core:$flink_version"
   shadow "org.apache.flink:flink-metrics-core:$flink_version"
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index b7ee08378dc..60e9be798f7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -98,7 +98,7 @@
  *   FlinkBatchPortablePipelineTranslator translator =
  *       FlinkBatchPortablePipelineTranslator.createTranslator();
  *   BatchTranslationContext context =
- *       FlinkBatchPortablePipelineTranslator.createTranslationContext(options);
+ *       FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
  *   translator.translate(context, pipeline);
  *   ExecutionEnvironment executionEnvironment = context.getExecutionEnvironment();
  *   // Do something with executionEnvironment...
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
new file mode 100644
index 00000000000..cfc6b3898fe
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService executorService,
+      RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+    return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private JobState.Enum jobState;
+  private List<Consumer<JobState.Enum>> stateObservers;
+
+  @Nullable
+  private ListenableFuture<PipelineResult> invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService executorService,
+      RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+    this.id = id;
+    this.executorService = executorService;
+    this.pipeline = pipeline;
+    this.pipelineOptions = pipelineOptions;
+    this.invocationFuture = null;
+    this.jobState = JobState.Enum.STOPPED;
+    this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+    MetricsEnvironment.setMetricsSupported(true);
+
+    LOG.info("Translating pipeline to Flink program.");
+    // Fused pipeline proto.
+    RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
+    JobInfo jobInfo = JobInfo.create(
+        id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions));
+    final JobExecutionResult result;
+
+    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline))
{
+      // TODO: Do we need to inspect for unbounded sources before fusing?
+      // batch translation
+      FlinkBatchPortablePipelineTranslator translator =
+          FlinkBatchPortablePipelineTranslator.createTranslator();
+      FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+          FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+      translator.translate(context, fusedPipeline);
+      result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
+    } else {
+      // streaming translation
+      FlinkStreamingPortablePipelineTranslator translator =
+          new FlinkStreamingPortablePipelineTranslator();
+      FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context =
+          FlinkStreamingPortablePipelineTranslator.createTranslationContext(jobInfo);
+      translator.translate(context, fusedPipeline);
+      result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
+    }
+
+    return FlinkRunner.createPipelineResult(result, pipelineOptions);
+  }
+
+  @Override
+  public synchronized void start() {
+    LOG.info("Starting job invocation {}", getId());
+    if (getState() != JobState.Enum.STOPPED) {
+      throw new IllegalStateException(String.format("Job %s already running.", getId()));
+    }
+    setState(JobState.Enum.STARTING);
+    invocationFuture = executorService.submit(this::runPipeline);
+    // TODO: Defer transitioning until the pipeline is up and running.
+    setState(JobState.Enum.RUNNING);
+    Futures.addCallback(
+        invocationFuture,
+        new FutureCallback<PipelineResult>() {
+          @Override
+          public void onSuccess(@Nullable PipelineResult pipelineResult) {
+            if (pipelineResult != null) {
+              checkArgument(pipelineResult.getState() == PipelineResult.State.DONE,
+                  "Success on non-Done state: " + pipelineResult.getState());
+              setState(JobState.Enum.DONE);
+            } else {
+              setState(JobState.Enum.UNSPECIFIED);
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable throwable) {
+            String message = String.format("Error during job invocation %s.", getId());
+            LOG.error(message, throwable);
+            setState(JobState.Enum.FAILED);
+          }
+        },
+        executorService);
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public synchronized void cancel() {
+    LOG.info("Canceling job invocation {}", getId());
+    if (this.invocationFuture != null) {
+      this.invocationFuture.cancel(true /* mayInterruptIfRunning */);
+      Futures.addCallback(
+          invocationFuture,
+          new FutureCallback<PipelineResult>() {
+            @Override
+            public void onSuccess(@Nullable PipelineResult pipelineResult) {
+              if (pipelineResult != null) {
+                try {
+                  pipelineResult.cancel();
+                } catch (IOException exn) {
+                  throw new RuntimeException(exn);
+                }
+              }
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) { }
+          },
+          executorService);
+    }
+  }
+
+  @Override
+  public JobState.Enum getState() {
+    return this.jobState;
+  }
+
+  @Override
+  public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver)
{
+    stateStreamObserver.accept(getState());
+    stateObservers.add(stateStreamObserver);
+  }
+
+  @Override
+  public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver)
{
+    LOG.warn("addMessageObserver() not yet implemented.");
+  }
+
+  private synchronized void setState(JobState.Enum state) {
+    this.jobState = state;
+    for (Consumer<JobState.Enum> observer : stateObservers) {
+      observer.accept(state);
+    }
+  }
+
+  /** Indicates whether the given pipeline has any unbounded PCollections. */
+  private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
+    checkNotNull(pipeline);
+    Collection<RunnerApi.PCollection> pCollecctions = pipeline.getComponents()
+        .getPcollectionsMap().values();
+    // Assume that all PCollections are consumed at some point in the pipeline.
+    return pCollecctions.stream()
+        .anyMatch(pc -> pc.getIsBounded() == RunnerApi.IsBounded.Enum.UNBOUNDED);
+  }
+
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
new file mode 100644
index 00000000000..cda2975da66
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService executorService) {
+    return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+      RunnerApi.Pipeline pipeline, Struct options, @Nullable String artifactToken)
+      throws IOException {
+    // TODO: How to make Java/Python agree on names of keys and their values?
+    LOG.trace("Parsing pipeline options");
+    FlinkPipelineOptions flinkOptions = PipelineOptionsTranslation.fromProto(options)
+        .as(FlinkPipelineOptions.class);
+
+    String invocationId = String.format(
+        "%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+    LOG.info("Invoking job {}", invocationId);
+
+    // Set Flink Master to [auto] if no option was specified.
+    if (flinkOptions.getFlinkMaster() == null) {
+      flinkOptions.setFlinkMaster("[auto]");
+    }
+
+    flinkOptions.setRunner(null);
+
+    return FlinkJobInvocation.create(
+        invocationId,
+        executorService,
+        pipeline,
+        flinkOptions);
+  }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
new file mode 100644
index 00000000000..49efe549250
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Driver program that starts a job server. */
+public class FlinkJobServerDriver implements Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);
+
+  private final ListeningExecutorService executor;
+  private final ServerConfiguration configuration;
+  private final ServerFactory serverFactory;
+
+  private static class ServerConfiguration {
+    @Option(
+        name = "--job-host",
+        required = true,
+        usage = "The job server host string"
+    )
+    private String host = "";
+
+    @Option(
+        name = "--artifacts-dir",
+        usage = "The location to store staged artifact files"
+    )
+    private String artifactStagingPath = "/tmp/beam-artifact-staging";
+  }
+
+  public static void main(String[] args) {
+    ServerConfiguration configuration = new ServerConfiguration();
+    CmdLineParser parser = new CmdLineParser(configuration);
+    try {
+      parser.parseArgument(args);
+    } catch (CmdLineException e) {
+      LOG.error("Unable to parse command line arguments.", e);
+      printUsage(parser);
+      return;
+    }
+    FlinkJobServerDriver driver = fromConfig(configuration);
+    driver.run();
+  }
+
+  private static void printUsage(CmdLineParser parser) {
+    System.err.println(
+        String.format(
+            "Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName()));
+    parser.printUsage(System.err);
+    System.err.println();
+  }
+
+  public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) {
+    ThreadFactory threadFactory =
+        new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
+    ListeningExecutorService executor =
+        MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
+    ServerFactory serverFactory = ServerFactory.createDefault();
+    return create(configuration, executor, serverFactory);
+  }
+
+  public static FlinkJobServerDriver create(
+      ServerConfiguration configuration,
+      ListeningExecutorService executor,
+      ServerFactory serverFactory) {
+    return new FlinkJobServerDriver(configuration, executor, serverFactory);
+  }
+
+  private FlinkJobServerDriver(
+      ServerConfiguration configuration,
+      ListeningExecutorService executor,
+      ServerFactory serverFactory) {
+    this.configuration = configuration;
+    this.executor = executor;
+    this.serverFactory = serverFactory;
+  }
+
+  @Override
+  public void run() {
+    try {
+      GrpcFnServer<InMemoryJobService> server = createJobServer();
+      server.getServer().awaitTermination();
+    } catch (InterruptedException e) {
+      LOG.warn("Job server interrupted", e);
+    } catch (Exception e) {
+      LOG.warn("Exception during job server creation", e);
+    }
+  }
+
+  private GrpcFnServer<InMemoryJobService> createJobServer() throws IOException {
+    InMemoryJobService service = createJobService();
+    Endpoints.ApiServiceDescriptor descriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
+    return GrpcFnServer.create(service, descriptor, serverFactory);
+  }
+
+  private InMemoryJobService createJobService() throws IOException {
+    GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
+        createArtifactStagingService();
+    JobInvoker invoker = createJobInvoker();
+    return InMemoryJobService.create(
+        artifactStagingService.getApiServiceDescriptor(),
+        (String session) -> {
+          try {
+            return BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+                session, configuration.artifactStagingPath);
+          } catch (Exception exn) {
+            throw new RuntimeException(exn);
+          }
+        },
+        invoker);
+  }
+
+  private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService()
+      throws IOException {
+    BeamFileSystemArtifactStagingService service =
+        new BeamFileSystemArtifactStagingService();
+    return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
+  }
+
+  private JobInvoker createJobInvoker() throws IOException {
+    return FlinkJobInvoker.create(executor);
+  }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 3776c8b0502..cf46071c1af 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -118,7 +118,10 @@ public PipelineResult run(Pipeline pipeline) {
       LOG.error("Pipeline execution failed", e);
       throw new RuntimeException("Pipeline execution failed", e);
     }
+    return createPipelineResult(result, options);
+  }
 
+  static PipelineResult createPipelineResult(JobExecutionResult result, PipelineOptions options)
{
     if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
       LOG.info("Pipeline submitted in Detached mode");
       FlinkDetachedRunnerResult flinkDetachedRunnerResult = new FlinkDetachedRunnerResult();
@@ -193,9 +196,9 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node
node) {
       });
 
       LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for
{} "
-          + "because the key coder is not deterministic. Falling back to singleton implementation
"
-          + "which may cause memory and/or performance problems. Future major versions of
"
-          + "the Flink runner will require deterministic key coders.",
+              + "because the key coder is not deterministic. Falling back to singleton "
+              + "implementation which may cause memory and/or performance problems. Future
major "
+              + "versions of the Flink runner will require deterministic key coders.",
           ptransformViewNamesWithNonDeterministicKeyCoders);
     }
   }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java
index a8b582640b5..aed3cbc5b2f 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java
@@ -37,5 +37,5 @@
   /**
    * Get an artifact by its name.
    */
-  void getArtifact(String name, StreamObserver<ArtifactChunk> responseObserver);
+  void getArtifact(String name, StreamObserver<ArtifactChunk> responseObserver) throws
IOException;
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
new file mode 100644
index 00000000000..e2e359fbf89
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.sdk.io.FileSystems;
+
+/**
+ * An ArtifactSource suitable for retrieving artifacts uploaded via
+ * {@link BeamFileSystemArtifactStagingService}.
+ */
+public class BeamFileSystemArtifactSource implements ArtifactSource {
+
+  private static final int CHUNK_SIZE = 2 * 1024 * 1024;
+
+  private final String retrievalToken;
+  private ArtifactApi.ProxyManifest proxyManifest;
+
+  public BeamFileSystemArtifactSource(String retrievalToken) {
+    this.retrievalToken = retrievalToken;
+  }
+
+  public static BeamFileSystemArtifactSource create(String artifactToken) {
+    return new BeamFileSystemArtifactSource(artifactToken);
+  }
+
+  @Override
+  public ArtifactApi.Manifest getManifest() throws IOException {
+    return getProxyManifest().getManifest();
+  }
+
+  @Override
+  public void getArtifact(String name,
+      StreamObserver<ArtifactApi.ArtifactChunk> responseObserver) throws IOException
{
+    ReadableByteChannel artifact = FileSystems
+        .open(FileSystems.matchNewResource(lookupUri(name), false));
+    ByteBuffer buffer = ByteBuffer.allocate(CHUNK_SIZE);
+    while (artifact.read(buffer) > -1) {
+      buffer.flip();
+      responseObserver.onNext(
+          ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(buffer)).build());
+      buffer.clear();
+    }
+  }
+
+  private String lookupUri(String name) throws IOException {
+    for (ArtifactApi.ProxyManifest.Location location : getProxyManifest().getLocationList())
{
+      if (location.getName().equals(name)) {
+        return location.getUri();
+      }
+    }
+    throw new IllegalArgumentException("No such artifact: " + name);
+  }
+
+  private ArtifactApi.ProxyManifest getProxyManifest() throws IOException {
+    if (proxyManifest == null) {
+      ArtifactApi.ProxyManifest.Builder builder = ArtifactApi.ProxyManifest.newBuilder();
+      JsonFormat.parser().merge(Channels.newReader(
+          FileSystems.open(FileSystems.matchNewResource(retrievalToken, false /* is directory
*/)),
+          StandardCharsets.UTF_8.name()), builder);
+      proxyManifest = builder.build();
+    }
+    return proxyManifest;
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
index d9755c6d295..e089a90eefa 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -60,17 +60,36 @@
 
   public static InMemoryJobService create(
       Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) {
-    return new InMemoryJobService(stagingServiceDescriptor, invoker);
+    return new InMemoryJobService(stagingServiceDescriptor, (String session) -> "token",
invoker);
+  }
+
+  /**
+   * Creates an InMemoryJobService.
+   *
+   * @param stagingServiceDescriptor Endpoint for the staging service.
+   * @param stagingServiceTokenProvider Function mapping a preparationId to a staging service
token.
+   * @param invoker A JobInvoker that will actually create the jobs.
+   * @return A new InMemoryJobService.
+   */
+  public static InMemoryJobService create(
+      Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
+      Function<String, String> stagingServiceTokenProvider,
+      JobInvoker invoker) {
+    return new InMemoryJobService(stagingServiceDescriptor, stagingServiceTokenProvider,
invoker);
   }
 
   private final ConcurrentMap<String, JobPreparation> preparations;
   private final ConcurrentMap<String, JobInvocation> invocations;
   private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
+  private final Function<String, String> stagingServiceTokenProvider;
   private final JobInvoker invoker;
 
   private InMemoryJobService(
-      Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) {
+      Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
+      Function<String, String> stagingServiceTokenProvider,
+      JobInvoker invoker) {
     this.stagingServiceDescriptor = stagingServiceDescriptor;
+    this.stagingServiceTokenProvider = stagingServiceTokenProvider;
     this.invoker = invoker;
 
     this.preparations = new ConcurrentHashMap<>();
@@ -114,9 +133,7 @@ public void prepare(
               .newBuilder()
               .setPreparationId(preparationId)
               .setArtifactStagingEndpoint(stagingServiceDescriptor)
-              // TODO: Pass the correct token for staging. The token depends on the
-              // ArtifactStagingService implementation.
-              .setStagingSessionToken("token")
+              .setStagingSessionToken(stagingServiceTokenProvider.apply(preparationId))
               .build();
       responseObserver.onNext(response);
       responseObserver.onCompleted();
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
new file mode 100644
index 00000000000..01f1723d99b
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for BeamFileSystemArtifactSource.
+ */
+@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest {
+
+  BeamFileSystemArtifactStagingService stagingService = new BeamFileSystemArtifactStagingService();
+
+  @Rule public TemporaryFolder stagingDir = new TemporaryFolder();
+
+  @Test public void testStagingService() throws Exception {
+    String stagingSession = "stagingSession";
+    String stagingSessionToken = BeamFileSystemArtifactStagingService
+        .generateStagingSessionToken(stagingSession, stagingDir.newFolder().getPath());
+    List<ArtifactApi.ArtifactMetadata> metadata = new ArrayList<>();
+
+    metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build());
+    putArtifactContents(stagingSessionToken, "first", "file1");
+
+    metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build());
+    putArtifactContents(stagingSessionToken, "second", "file2");
+
+    String stagingToken = commitManifest(stagingSessionToken, metadata);
+
+    BeamFileSystemArtifactSource artifactSource = new BeamFileSystemArtifactSource(stagingToken);
+    Assert.assertEquals("first", getArtifactContents(artifactSource, "file1"));
+    Assert.assertEquals("second", getArtifactContents(artifactSource, "file2"));
+    Assert.assertThat(artifactSource.getManifest().getArtifactList(),
+        containsInAnyOrder(metadata.toArray(new ArtifactApi.ArtifactMetadata[0])));
+  }
+
+  private String commitManifest(String stagingSessionToken,
+      List<ArtifactApi.ArtifactMetadata> artifacts) {
+    String[] stagingTokenHolder = new String[1];
+    stagingService.commitManifest(
+        ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken)
+            .setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(),
+        new StreamObserver<ArtifactApi.CommitManifestResponse>() {
+
+          @Override public void onNext(ArtifactApi.CommitManifestResponse commitManifestResponse)
{
+            stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken();
+          }
+
+          @Override public void onError(Throwable throwable) {
+            throw new RuntimeException(throwable);
+          }
+
+          @Override public void onCompleted() {
+          }
+        });
+
+    return stagingTokenHolder[0];
+  }
+
+  private void putArtifactContents(String stagingSessionToken, String contents, String name)
{
+    StreamObserver<ArtifactApi.PutArtifactRequest> outputStreamObserver = stagingService
+        .putArtifact(new StreamObserver<ArtifactApi.PutArtifactResponse>() {
+
+          @Override public void onNext(ArtifactApi.PutArtifactResponse putArtifactResponse)
{
+          }
+
+          @Override public void onError(Throwable throwable) {
+            throw new RuntimeException(throwable);
+          }
+
+          @Override public void onCompleted() {
+          }
+        });
+
+    outputStreamObserver.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(
+        ArtifactApi.PutArtifactMetadata.newBuilder()
+            .setMetadata(ArtifactApi.ArtifactMetadata.newBuilder().setName(name).build())
+            .setStagingSessionToken(stagingSessionToken)).build());
+    outputStreamObserver.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(
+        ArtifactApi.ArtifactChunk.newBuilder()
+            .setData(ByteString.copyFrom(contents, StandardCharsets.UTF_8))).build());
+    outputStreamObserver.onCompleted();
+  }
+
+  private String getArtifactContents(ArtifactSource artifactSource, String name)
+      throws IOException {
+    StringBuilder contents = new StringBuilder();
+    artifactSource.getArtifact(name, new StreamObserver<ArtifactApi.ArtifactChunk>()
{
+
+      @Override public void onNext(ArtifactApi.ArtifactChunk artifactChunk) {
+        contents.append(artifactChunk.getData().toString(StandardCharsets.UTF_8));
+      }
+
+      @Override public void onError(Throwable throwable) {
+        throw new RuntimeException(throwable);
+      }
+
+      @Override public void onCompleted() {
+      }
+    });
+    return contents.toString();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 112079)
    Time Spent: 4h 20m  (was: 4h 10m)

> Portable Flink runner JobService entry point in a Docker container
> ------------------------------------------------------------------
>
>                 Key: BEAM-4130
>                 URL: https://issues.apache.org/jira/browse/BEAM-4130
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Priority: Minor
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We need a main
entry point that itself spins up the job service (and artifact staging service). The main
program itself should be packaged into an uberjar such that it can be run locally or submitted
to a Flink deployment via `flink run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message