From dev-return-16674-archive-asf-public=cust-asf.ponee.io@reef.apache.org Fri Jun 8 01:34:41 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1986C18067B for ; Fri, 8 Jun 2018 01:34:38 +0200 (CEST) Received: (qmail 46147 invoked by uid 500); 7 Jun 2018 23:34:26 -0000 Mailing-List: contact dev-help@reef.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@reef.apache.org Delivered-To: mailing list dev@reef.apache.org Received: (qmail 45849 invoked by uid 99); 7 Jun 2018 23:34:26 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jun 2018 23:34:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 21600CA2C5 for ; Thu, 7 Jun 2018 23:34:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id CvNoNAnIjo1I for ; Thu, 7 Jun 2018 23:34:07 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 8AC1E60DB3 for ; Thu, 7 Jun 2018 23:34:04 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id B250CE0FB4 for ; Thu, 7 Jun 2018 23:34:03 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id EC9442181C for ; Thu, 7 Jun 2018 23:34:02 +0000 (UTC) Date: Thu, 7 Jun 2018 23:34:02 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@reef.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (REEF-2025) A new module containing the new Java bridge MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/REEF-2025?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16505= 484#comment-16505484 ]=20 ASF GitHub Bot commented on REEF-2025: -------------------------------------- motus commented on a change in pull request #1466: [REEF-2025] A new module= containing the new Java bridge URL: https://github.com/apache/reef/pull/1466#discussion_r193910635 =20 =20 ########## File path: lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/= bridge/driver/service/grpc/GRPCDriverService.java ########## @@ -0,0 +1,1084 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service.grpc; + +import com.google.protobuf.ByteString; +import io.grpc.*; +import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.driver.service.DriverClientException; +import org.apache.reef.bridge.driver.service.IDriverService; +import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.*; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.driver.context.EvaluatorContext; +import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorI= mpl; +import org.apache.reef.runtime.common.driver.idle.IdleMessage; +import org.apache.reef.runtime.common.utils.ExceptionCodec; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.OSUtils; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * GRPC DriverBridgeService that interacts with higher-level languages. + */ +public final class GRPCDriverService implements IDriverService { + private static final Logger LOG =3D Logger.getLogger(GRPCDriverService.c= lass.getName()); + + private static final Void VOID =3D Void.newBuilder().build(); + + private Process driverProcess; + + private enum StreamType { STDOUT, STDERR } + + private Server server; + + private DriverClientGrpc.DriverClientFutureStub clientStub; + + private final Clock clock; + + private final ExceptionCodec exceptionCodec; + + private final ConfigurationSerializer configurationSerializer; + + private final EvaluatorRequestor evaluatorRequestor; + + private final JVMProcessFactory jvmProcessFactory; + + private final CLRProcessFactory clrProcessFactory; + + private final TcpPortProvider tcpPortProvider; + + private final String driverClientCommand; + + private final Map allocatedEvaluatorMap =3D = new HashMap<>(); + + private final Map activeContextMap =3D new HashMa= p<>(); + + private final Map runningTaskMap =3D new HashMap<>(= ); + + private boolean stopped =3D false; + + @Inject + private GRPCDriverService( + final Clock clock, + final EvaluatorRequestor evaluatorRequestor, + final ConfigurationSerializer configurationSerializer, + final JVMProcessFactory jvmProcessFactory, + final CLRProcessFactory clrProcessFactory, + final TcpPortProvider tcpPortProvider, + final ExceptionCodec exceptionCodec, + @Parameter(DriverClientCommand.class) final String driverClientComma= nd) { + this.clock =3D clock; + this.exceptionCodec =3D exceptionCodec; + this.configurationSerializer =3D configurationSerializer; + this.jvmProcessFactory =3D jvmProcessFactory; + this.clrProcessFactory =3D clrProcessFactory; + this.evaluatorRequestor =3D evaluatorRequestor; + this.driverClientCommand =3D driverClientCommand; + this.tcpPortProvider =3D tcpPortProvider; + } + + private void start() throws IOException, InterruptedException { + for (final Integer port : this.tcpPortProvider) { + try { + this.server =3D ServerBuilder.forPort(port) + .addService(new DriverBridgeServiceImpl()) + .build() + .start(); + LOG.info("Server started, listening on " + port); + break; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server =3D=3D null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } else { + final String cmd =3D this.driverClientCommand + " " + this.server.ge= tPort(); + final List cmdOs =3D OSUtils.isWindows() ? + Arrays.asList("cmd.exe", "/c", cmd) : Arrays.asList("/bin/sh", "= -c", cmd); + LOG.log(Level.INFO, "CMD: " + cmdOs); + this.driverProcess =3D new ProcessBuilder() + .command(cmdOs) + .redirectError(new File("driverclient.stderr")) + .redirectOutput(new File("driverclient.stdout")) + .directory(new File(System.getProperty("user.dir"))) + .start(); + synchronized (this) { + int attempts =3D 10; // give some time + // wait for driver client process to register + while (attempts-- > 0 && this.clientStub =3D=3D null && driverProc= essIsAlive()) { + LOG.log(Level.INFO, "waiting for driver process to register"); + this.wait(1000); // a second + } + } + if (driverProcessIsAlive()) { + Thread closeChildThread =3D new Thread() { + public void run() { + synchronized (GRPCDriverService.this) { + if (GRPCDriverService.this.driverProcess !=3D null) { + GRPCDriverService.this.driverProcess.destroy(); + GRPCDriverService.this.driverProcess =3D null; + } + } + } + }; + Runtime.getRuntime().addShutdownHook(closeChildThread); + } + } + } + + private void stop() { + stop(null); + } + + private void stop(final Throwable t) { + LOG.log(Level.INFO, "STOP: gRPC Driver Service", t); + if (!stopped) { + try { + if (!clock.isClosed()) { + if (t !=3D null) { + clock.stop(t); + } else { + clock.stop(); + } + } + if (server !=3D null) { + LOG.log(Level.INFO, "Shutdown gRPC"); + this.server.shutdown(); + this.server =3D null; + } + if (this.driverProcess !=3D null) { + LOG.log(Level.INFO, "shutdown driver process"); + dump(); + this.driverProcess.destroy(); + this.driverProcess =3D null; + } + } finally { + LOG.log(Level.INFO, "COMPLETED STOP: gRPC Driver Service"); + stopped =3D true; + } + } + } + + private void dump() { + if (!driverProcessIsAlive()) { + LOG.log(Level.INFO, "Exit code: " + this.driverProcess.exitValue()); + } + dumpStream(StreamType.STDOUT); + dumpStream(StreamType.STDERR); + } + + private void dumpStream(final StreamType type) { + StringBuffer buffer =3D new StringBuffer(); + + String name =3D ""; + InputStream stream =3D null; + switch(type) { + case STDOUT: + name =3D "stdout"; + stream =3D this.driverProcess.getInputStream(); + break; + case STDERR: + name =3D "stderr"; + stream =3D this.driverProcess.getErrorStream(); + break; + default: + LOG.log(Level.WARNING, "Invalid stream type value"); =20 Review comment: Raise `RuntimeException` here, and make `name` and `stream` final ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > A new module containing the new Java bridge > ------------------------------------------- > > Key: REEF-2025 > URL: https://issues.apache.org/jira/browse/REEF-2025 > Project: REEF > Issue Type: Sub-task > Components: REEF Bridge > Affects Versions: 0.17 > Reporter: Tyson Condie > Assignee: Tyson Condie > Priority: Major > Fix For: 0.17 > > > This Jira introduces the module containing the new bridge.=C2=A0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)