From dev-return-16682-archive-asf-public=cust-asf.ponee.io@reef.apache.org Fri Jun 8 01:34:43 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0152A1807BC for ; Fri, 8 Jun 2018 01:34:39 +0200 (CEST) Received: (qmail 46629 invoked by uid 500); 7 Jun 2018 23:34:27 -0000 Mailing-List: contact dev-help@reef.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@reef.apache.org Delivered-To: mailing list dev@reef.apache.org Received: (qmail 46533 invoked by uid 99); 7 Jun 2018 23:34:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jun 2018 23:34:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 527891A39E0 for ; Thu, 7 Jun 2018 23:34:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id G1IZvIno8XIj for ; Thu, 7 Jun 2018 23:34:07 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 7E278612B1 for ; Thu, 7 Jun 2018 23:34:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 88F41E1014 for ; Thu, 7 Jun 2018 23:34:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5CE032109F for ; Thu, 7 Jun 2018 23:34:03 +0000 (UTC) Date: Thu, 7 Jun 2018 23:34:03 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@reef.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (REEF-2025) A new module containing the new Java bridge MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/REEF-2025?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16505= 500#comment-16505500 ]=20 ASF GitHub Bot commented on REEF-2025: -------------------------------------- motus commented on a change in pull request #1466: [REEF-2025] A new module= containing the new Java bridge URL: https://github.com/apache/reef/pull/1466#discussion_r193917695 =20 =20 ########## File path: lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/= bridge/driver/service/grpc/GRPCDriverService.java ########## @@ -0,0 +1,1084 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service.grpc; + +import com.google.protobuf.ByteString; +import io.grpc.*; +import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.driver.service.DriverClientException; +import org.apache.reef.bridge.driver.service.IDriverService; +import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.*; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.driver.context.EvaluatorContext; +import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorI= mpl; +import org.apache.reef.runtime.common.driver.idle.IdleMessage; +import org.apache.reef.runtime.common.utils.ExceptionCodec; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.OSUtils; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * GRPC DriverBridgeService that interacts with higher-level languages. + */ +public final class GRPCDriverService implements IDriverService { + private static final Logger LOG =3D Logger.getLogger(GRPCDriverService.c= lass.getName()); + + private static final Void VOID =3D Void.newBuilder().build(); + + private Process driverProcess; + + private enum StreamType { STDOUT, STDERR } + + private Server server; + + private DriverClientGrpc.DriverClientFutureStub clientStub; + + private final Clock clock; + + private final ExceptionCodec exceptionCodec; + + private final ConfigurationSerializer configurationSerializer; + + private final EvaluatorRequestor evaluatorRequestor; + + private final JVMProcessFactory jvmProcessFactory; + + private final CLRProcessFactory clrProcessFactory; + + private final TcpPortProvider tcpPortProvider; + + private final String driverClientCommand; + + private final Map allocatedEvaluatorMap =3D = new HashMap<>(); + + private final Map activeContextMap =3D new HashMa= p<>(); + + private final Map runningTaskMap =3D new HashMap<>(= ); + + private boolean stopped =3D false; + + @Inject + private GRPCDriverService( + final Clock clock, + final EvaluatorRequestor evaluatorRequestor, + final ConfigurationSerializer configurationSerializer, + final JVMProcessFactory jvmProcessFactory, + final CLRProcessFactory clrProcessFactory, + final TcpPortProvider tcpPortProvider, + final ExceptionCodec exceptionCodec, + @Parameter(DriverClientCommand.class) final String driverClientComma= nd) { + this.clock =3D clock; + this.exceptionCodec =3D exceptionCodec; + this.configurationSerializer =3D configurationSerializer; + this.jvmProcessFactory =3D jvmProcessFactory; + this.clrProcessFactory =3D clrProcessFactory; + this.evaluatorRequestor =3D evaluatorRequestor; + this.driverClientCommand =3D driverClientCommand; + this.tcpPortProvider =3D tcpPortProvider; + } + + private void start() throws IOException, InterruptedException { + for (final Integer port : this.tcpPortProvider) { + try { + this.server =3D ServerBuilder.forPort(port) + .addService(new DriverBridgeServiceImpl()) + .build() + .start(); + LOG.info("Server started, listening on " + port); + break; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server =3D=3D null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } else { + final String cmd =3D this.driverClientCommand + " " + this.server.ge= tPort(); + final List cmdOs =3D OSUtils.isWindows() ? + Arrays.asList("cmd.exe", "/c", cmd) : Arrays.asList("/bin/sh", "= -c", cmd); + LOG.log(Level.INFO, "CMD: " + cmdOs); + this.driverProcess =3D new ProcessBuilder() + .command(cmdOs) + .redirectError(new File("driverclient.stderr")) + .redirectOutput(new File("driverclient.stdout")) + .directory(new File(System.getProperty("user.dir"))) + .start(); + synchronized (this) { + int attempts =3D 10; // give some time + // wait for driver client process to register + while (attempts-- > 0 && this.clientStub =3D=3D null && driverProc= essIsAlive()) { + LOG.log(Level.INFO, "waiting for driver process to register"); + this.wait(1000); // a second + } + } + if (driverProcessIsAlive()) { + Thread closeChildThread =3D new Thread() { + public void run() { + synchronized (GRPCDriverService.this) { + if (GRPCDriverService.this.driverProcess !=3D null) { + GRPCDriverService.this.driverProcess.destroy(); + GRPCDriverService.this.driverProcess =3D null; + } + } + } + }; + Runtime.getRuntime().addShutdownHook(closeChildThread); + } + } + } + + private void stop() { + stop(null); + } + + private void stop(final Throwable t) { + LOG.log(Level.INFO, "STOP: gRPC Driver Service", t); + if (!stopped) { + try { + if (!clock.isClosed()) { + if (t !=3D null) { + clock.stop(t); + } else { + clock.stop(); + } + } + if (server !=3D null) { + LOG.log(Level.INFO, "Shutdown gRPC"); + this.server.shutdown(); + this.server =3D null; + } + if (this.driverProcess !=3D null) { + LOG.log(Level.INFO, "shutdown driver process"); + dump(); + this.driverProcess.destroy(); + this.driverProcess =3D null; + } + } finally { + LOG.log(Level.INFO, "COMPLETED STOP: gRPC Driver Service"); + stopped =3D true; + } + } + } + + private void dump() { + if (!driverProcessIsAlive()) { + LOG.log(Level.INFO, "Exit code: " + this.driverProcess.exitValue()); + } + dumpStream(StreamType.STDOUT); + dumpStream(StreamType.STDERR); + } + + private void dumpStream(final StreamType type) { + StringBuffer buffer =3D new StringBuffer(); + + String name =3D ""; + InputStream stream =3D null; + switch(type) { + case STDOUT: + name =3D "stdout"; + stream =3D this.driverProcess.getInputStream(); + break; + case STDERR: + name =3D "stderr"; + stream =3D this.driverProcess.getErrorStream(); + break; + default: + LOG.log(Level.WARNING, "Invalid stream type value"); + } + + LOG.log(Level.INFO, "capturing driver process " + name); + try { + int nextChar; + buffer.append("\n=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D\n"); + while ((nextChar =3D stream.read()) !=3D -1) { + buffer.append((char) nextChar); + } + buffer.append("\n=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D\n"); + } catch (IOException e) { + LOG.log(Level.WARNING, "Error while capturing output stream: " + e.g= etMessage()); + } + LOG.log(Level.INFO, buffer.toString()); + } + + /** + * Determines if the driver process is still alive by + * testing for its exit value, which throws {@link IllegalThreadStateExc= eption} + * if process is still running. + * @return true if driver process is alive, false otherwise + */ + private boolean driverProcessIsAlive() { + if (this.driverProcess !=3D null) { + try { + this.driverProcess.exitValue(); + } catch (IllegalThreadStateException e) { + return true; + } + } + return false; + } + + private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final Evaluato= rDescriptor descriptor) { + if (descriptor =3D=3D null) { + return null; + } else { + return EvaluatorDescriptorInfo.newBuilder() + .setCores(descriptor.getNumberOfCores()) + .setMemory(descriptor.getMemory()) + .setRuntimeName(descriptor.getRuntimeName()) + .build(); + } + } + + @Override + public IdleMessage getIdleStatus() { + final String componentName =3D "Java Bridge DriverService"; + if (this.clientStub !=3D null) { + try { + final IdleStatus idleStatus =3D this.clientStub.idlenessCheckHandl= er(VOID).get(); + LOG.log(Level.INFO, "is idle: " + idleStatus.getIsIdle()); + return new IdleMessage( + componentName, + idleStatus.getReason(), + idleStatus.getIsIdle()); + } catch (ExecutionException | InterruptedException e) { + stop(e); + } + } + return new IdleMessage( + componentName, + "stub not initialized", + true); + } + + @Override + public void startHandler(final StartTime startTime) { + try { + start(); + synchronized (this) { + if (this.clientStub !=3D null) { + this.clientStub.startHandler( + StartTimeInfo.newBuilder().setStartTime(startTime.getTimesta= mp()).build()); + } else { + stop(new IllegalStateException("Unable to start driver client"))= ; + } + } + } catch (IOException | InterruptedException e) { + stop(e); + } + } + + @Override + public void stopHandler(final StopTime stopTime) { + synchronized (this) { + if (clientStub !=3D null) { + final Future callCompletion =3D this.clientStub.sto= pHandler( + StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp())= .build()); + try { + try { + final ExceptionInfo error =3D callCompletion.get(5L, TimeUnit.= MINUTES); + if (!error.getNoError()) { + final Optional t =3D parseException(error); + if (t.isPresent()) { + throw new RuntimeException("driver stop exception", + t.get().getCause() !=3D null ? t.get().getCause() : t.= get()); + } else { + throw new RuntimeException(error.getMessage() !=3D null ? = error.getMessage() : error.getName()); + } + } + } catch (TimeoutException e) { + throw new RuntimeException("stop handler timed out", e); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + stop(); + } + } + } + } + + @Override + public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.put(eval.getId(), eval); + this.clientStub.allocatedEvaluatorHandler( + EvaluatorInfo.newBuilder() + .setEvaluatorId(eval.getId()) + .setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluat= orDescriptor())) + .build()); + } + } + + @Override + public void completedEvaluatorHandler(final CompletedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.completedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build())= ; + } + } + + @Override + public void failedEvaluatorHandler(final FailedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.failedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build())= ; + } + } + + @Override + public void activeContextHandler(final ActiveContext context) { + synchronized (this) { + this.activeContextMap.put(context.getId(), context); + this.clientStub.activeContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId( + context.getParentId().isPresent() ? + context.getParentId().get() : "") + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void closedContextHandler(final ClosedContext context) { + synchronized (this) { + this.activeContextMap.remove(context.getId()); + this.clientStub.closedContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentContext().getId()) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void failedContextHandler(final FailedContext context) { + synchronized (this) { + final ContextInfo.Builder contextInfoBuilder =3D + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId( + context.getParentContext().isPresent() ? + context.getParentContext().get().getId() : "") + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())); + if (context.getReason().isPresent()) { + final Throwable reason =3D context.getReason().get(); + contextInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(context.getMessage() !=3D null ? context.getMessag= e() : "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + } else if (context.getData().isPresent()) { + contextInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(context.toString()) + .setMessage(context.getDescription().isPresent() ? + context.getDescription().get() : + context.getMessage() !=3D null ? context.getMessage() : ""= ) + .setData(ByteString.copyFrom(context.getData().get())) + .build()); + } else { + final Throwable reason =3D context.asError(); + contextInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(context.getMessage() !=3D null ? context.getMessag= e() : "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + + } + this.activeContextMap.remove(context.getId()); + this.clientStub.failedContextHandler(contextInfoBuilder.build()); + } + } + + @Override + public void contextMessageHandler(final ContextMessage message) { + synchronized (this) { + this.clientStub.contextMessageHandler( + ContextMessageInfo.newBuilder() + .setContextId(message.getId()) + .setMessageSourceId(message.getMessageSourceID()) + .setSequenceNumber(message.getSequenceNumber()) + .setPayload(ByteString.copyFrom(message.get())) + .build()); + } + } + + @Override + public void runningTaskHandler(final RunningTask task) { + synchronized (this) { + final ActiveContext context =3D task.getActiveContext(); + if (!this.activeContextMap.containsKey(context.getId())) { + this.activeContextMap.put(context.getId(), context); + } + this.runningTaskMap.put(task.getId(), task); + this.clientStub.runningTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentId().isPresent() ? context= .getParentId().get() : "") + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + task.getActiveContext().getEvaluatorDescriptor())) + .build()) + .build()); + } + } + + @Override + public void failedTaskHandler(final FailedTask task) { + synchronized (this) { + if (task.getActiveContext().isPresent() && + !this.activeContextMap.containsKey(task.getActiveContext().get()= .getId())) { + this.activeContextMap.put(task.getActiveContext().get().getId(), t= ask.getActiveContext().get()); + } + final TaskInfo.Builder taskInfoBuilder =3D TaskInfo.newBuilder() + .setTaskId(task.getId()); + if (task.getActiveContext().isPresent()) { + taskInfoBuilder.setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().get().getId()) + .setEvaluatorId(task.getActiveContext().get().getEvaluatorId()= ) + .setParentId(task.getActiveContext().get().getParentId().isPre= sent() ? + task.getActiveContext().get().getParentId().get() : "") + .build()); + } + if (task.getReason().isPresent()) { + final Throwable reason =3D task.getReason().get(); + taskInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(task.getMessage() !=3D null ? task.getMessage() : = "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + } else if (task.getData().isPresent()) { + final Throwable reason =3D task.asError(); + taskInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(task.getMessage() !=3D null ? task.getMessage() : = "") + .setData(ByteString.copyFrom(task.getData().get())) + .build()); + } else { + final Throwable reason =3D task.asError(); + taskInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(task.getMessage() !=3D null ? task.getMessage() : = "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.failedTaskHandler(taskInfoBuilder.build()); + } + } + + @Override + public void completedTaskHandler(final CompletedTask task) { + synchronized (this) { + if (!this.activeContextMap.containsKey(task.getActiveContext().getId= ())) { + this.activeContextMap.put(task.getActiveContext().getId(), task.ge= tActiveContext()); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.completedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().getId()) + .setEvaluatorId(task.getActiveContext().getEvaluatorId()= ) + .setParentId(task.getActiveContext().getParentId().isPre= sent() ? + task.getActiveContext().getParentId().get() : "") =20 Review comment: use `.orElse("")` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > A new module containing the new Java bridge > ------------------------------------------- > > Key: REEF-2025 > URL: https://issues.apache.org/jira/browse/REEF-2025 > Project: REEF > Issue Type: Sub-task > Components: REEF Bridge > Affects Versions: 0.17 > Reporter: Tyson Condie > Assignee: Tyson Condie > Priority: Major > Fix For: 0.17 > > > This Jira introduces the module containing the new bridge.=C2=A0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)