From commits-return-71936-archive-asf-public=cust-asf.ponee.io@beam.apache.org Fri May 11 03:16:08 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id CFF8618077F for ; Fri, 11 May 2018 03:16:07 +0200 (CEST) Received: (qmail 29271 invoked by uid 500); 11 May 2018 01:16:06 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 29034 invoked by uid 99); 11 May 2018 01:16:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 May 2018 01:16:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 30880180857 for ; Fri, 11 May 2018 01:16:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.51 X-Spam-Level: X-Spam-Status: No, score=-109.51 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 7LJLjIYMJ4OV for ; Fri, 11 May 2018 01:16:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id BFB0A5FC04 for ; Fri, 11 May 2018 01:16:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 7B7AAE0F6D for ; Fri, 11 May 2018 01:16:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id ADE822154F for ; Fri, 11 May 2018 01:16:00 +0000 (UTC) Date: Fri, 11 May 2018 01:16:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=3D1= 00947&page=3Dcom.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpan= el#worklog-100947 ] ASF GitHub Bot logged work on BEAM-2588: ---------------------------------------- Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m=20 Work Description: jkff commented on a change in pull request #5262: [= BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187498931 =20 =20 ########## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners= /fnexecution/jobsubmission/JobService.java ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied= . + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService= ; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService= Provider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a = {@link JobInvoker}. + * + *

Job management is handled in-memory rather than any persistent stora= ge, running the risk of + * leaking jobs if the JobService crashes. + * + *

TODO: replace in-memory job management state with persistent solutio= n. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implemen= ts FnService { + private static final Logger LOG =3D LoggerFactory.getLogger(JobService.c= lass); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobIn= voker invoker) { + return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMap preparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvi= der; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobIn= voker invoker) { + this.artifactStagingServiceProvider =3D artifactStagingServiceProvider= ; + this.invoker =3D invoker; + + this.preparations =3D new ConcurrentHashMap<>(); + this.invocations =3D new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { + try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request)= ; + // insert preparation + String preparationId =3D + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.c= urrent().nextInt()); + GrpcFnServer stagingService =3D + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions =3D request.getPipelineOptions(); + if (pipelineOptions =3D=3D null) { + LOG.trace("PIPELINE OPTIONS IS NULL"); + throw new NullPointerException("Encountered null pipeline options.= "); + /* + LOG.debug("Encountered null pipeline options. Using default."); + pipelineOptions =3D Struct.getDefaultInstance(); + */ + } else { + LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipe= lineOptions); + JobPreparation preparation =3D + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous =3D preparations.putIfAbsent(preparationId, = preparation); + if (previous !=3D null) { + // retry recursively in the unlikely case of a name collision. + String errMessage =3D + String.format("Name collision for preparation ID \"%s\". Retry= ing.", preparationId); + LOG.warn(errMessage); + prepare(request, responseObserver); + return; + } + + // send response + PrepareJobResponse response =3D + PrepareJobResponse + .newBuilder() + .setPreparationId(preparationId) + .setArtifactStagingEndpoint(stagingService.getApiServiceDesc= riptor()) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + LOG.error("Could not prepare job with name {}", request.getJobName()= , e); + responseObserver.onError(Status.INTERNAL.withCause(e).asException())= ; + } + } + + @Override + public void run( + RunJobRequest request, StreamObserver responseObserv= er) { + LOG.trace("{} {}", RunJobRequest.class.getSimpleName(), request); + + String preparationId =3D request.getPreparationId(); + try { + // retrieve job preparation + JobPreparation preparation =3D preparations.get(preparationId); + if (preparation =3D=3D null) { + String errMessage =3D String.format("Unknown Preparation Id \"%s\"= .", preparationId); + StatusException exception =3D Status.NOT_FOUND.withDescription(err= Message).asException(); + responseObserver.onError(exception); + return; + } + + // create new invocation + JobInvocation invocation =3D invoker.invoke(preparation, request.get= StagingToken()); + String invocationId =3D invocation.getId(); + invocation.start(); + invocations.put(invocationId, invocation); + RunJobResponse response =3D + RunJobResponse.newBuilder().setJobId(invocationId).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { =20 Review comment: Not sure why the handling of this exception differs from the next one, m= aybe we can join them? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 100947) Time Spent: 3h 10m (was: 3h) > Portable Flink Runner Job API > ----------------------------- > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink > Reporter: Kenneth Knowles > Assignee: Axel Magnuson > Priority: Major > Labels: portability > Time Spent: 3h 10m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that=C2= =A0it can accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)