From dev-return-16724-archive-asf-public=cust-asf.ponee.io@reef.apache.org Sat Jun 9 03:46:13 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6D72018072F for ; Sat, 9 Jun 2018 03:46:11 +0200 (CEST) Received: (qmail 97288 invoked by uid 500); 9 Jun 2018 01:46:09 -0000 Mailing-List: contact dev-help@reef.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@reef.apache.org Delivered-To: mailing list dev@reef.apache.org Received: (qmail 96457 invoked by uid 99); 9 Jun 2018 01:46:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Jun 2018 01:46:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 0FAD0CE0CA for ; Sat, 9 Jun 2018 01:46:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id L_c1L_y4hutx for ; Sat, 9 Jun 2018 01:46:06 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 81E3E5F4A9 for ; Sat, 9 Jun 2018 01:46:04 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id DCE98E0E4B for ; Sat, 9 Jun 2018 01:46:03 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 9B2192109B for ; Sat, 9 Jun 2018 01:46:02 +0000 (UTC) Date: Sat, 9 Jun 2018 01:46:02 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@reef.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (REEF-2025) A new module containing the new Java bridge MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/REEF-2025?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16506= 781#comment-16506781 ]=20 ASF GitHub Bot commented on REEF-2025: -------------------------------------- motus commented on a change in pull request #1466: [REEF-2025] A new module= containing the new Java bridge URL: https://github.com/apache/reef/pull/1466#discussion_r194210239 =20 =20 ########## File path: lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/= bridge/driver/service/grpc/GRPCDriverService.java ########## @@ -0,0 +1,1060 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service.grpc; + +import com.google.protobuf.ByteString; +import io.grpc.*; +import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.driver.client.grpc.GRPCUtils; +import org.apache.reef.bridge.driver.service.DriverClientException; +import org.apache.reef.bridge.driver.service.IDriverService; +import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.*; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.driver.context.EvaluatorContext; +import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorI= mpl; +import org.apache.reef.runtime.common.driver.idle.IdleMessage; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.common.utils.ExceptionCodec; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.OSUtils; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * GRPC DriverBridgeService that interacts with higher-level languages. + */ +public final class GRPCDriverService implements IDriverService { + private static final Logger LOG =3D Logger.getLogger(GRPCDriverService.c= lass.getName()); + + private static final Void VOID =3D Void.newBuilder().build(); + + private Process driverProcess; + + private enum StreamType { STDOUT, STDERR } + + private Server server; + + private DriverClientGrpc.DriverClientFutureStub clientStub; + + private final Clock clock; + + private final REEFFileNames reefFileNames; + + private final ExceptionCodec exceptionCodec; + + private final ConfigurationSerializer configurationSerializer; + + private final EvaluatorRequestor evaluatorRequestor; + + private final JVMProcessFactory jvmProcessFactory; + + private final CLRProcessFactory clrProcessFactory; + + private final TcpPortProvider tcpPortProvider; + + private final String driverClientCommand; + + private final Map allocatedEvaluatorMap =3D = new HashMap<>(); + + private final Map activeContextMap =3D new HashMa= p<>(); + + private final Map runningTaskMap =3D new HashMap<>(= ); + + private boolean stopped =3D false; + + @Inject + private GRPCDriverService( + final Clock clock, + final REEFFileNames reefFileNames, + final EvaluatorRequestor evaluatorRequestor, + final ConfigurationSerializer configurationSerializer, + final JVMProcessFactory jvmProcessFactory, + final CLRProcessFactory clrProcessFactory, + final TcpPortProvider tcpPortProvider, + final ExceptionCodec exceptionCodec, + @Parameter(DriverClientCommand.class) final String driverClientComma= nd) { + this.clock =3D clock; + this.reefFileNames =3D reefFileNames; + this.exceptionCodec =3D exceptionCodec; + this.configurationSerializer =3D configurationSerializer; + this.jvmProcessFactory =3D jvmProcessFactory; + this.clrProcessFactory =3D clrProcessFactory; + this.evaluatorRequestor =3D evaluatorRequestor; + this.driverClientCommand =3D driverClientCommand; + this.tcpPortProvider =3D tcpPortProvider; + } + + private void start() throws IOException, InterruptedException { + for (final int port : this.tcpPortProvider) { + try { + this.server =3D ServerBuilder.forPort(port) + .addService(new DriverBridgeServiceImpl()) + .build() + .start(); + LOG.log(Level.INFO, "Server started, listening on port [{0}]", por= t); + break; + } catch (final IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server =3D=3D null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } + final String cmd =3D this.driverClientCommand + " " + this.server.getP= ort(); + final List cmdOs =3D OSUtils.isWindows() ? + Arrays.asList("cmd.exe", "/c", cmd) : Arrays.asList("/bin/sh", "-c= ", cmd); + LOG.log(Level.INFO, "CMD: {0}", cmdOs); + this.driverProcess =3D new ProcessBuilder() + .command(cmdOs) + .redirectError(new File(this.reefFileNames.getDriverClientStderrFi= leName())) + .redirectOutput(new File(this.reefFileNames.getDriverClientStdoutF= ileName())) + .start(); + synchronized (this) { + int attempts =3D 10; // give some time + /* wait for driver client process to register + * Note: attempts and wait time have been given reasonable hardcoded= values for a driver + * client to register with the driver service (us). Making these val= ues configurable would + * require additions to the ClientProtocol buffer such that they can= be passed to the + * GRPCDriverServiceConfigurationProvider and bound to the appropria= te NamedParameters. It + * is the opinion at the time of this writing that a driver client s= hould be able to register + * within 10 seconds. + */ + while (attempts-- > 0 && this.clientStub =3D=3D null && driverProces= sIsAlive()) { + LOG.log(Level.INFO, "waiting for driver process to register"); + this.wait(1000); // a second + } + } + if (driverProcessIsAlive()) { + final Thread closeChildThread =3D new Thread() { + public void run() { + synchronized (GRPCDriverService.this) { + if (GRPCDriverService.this.driverProcess !=3D null) { + GRPCDriverService.this.driverProcess.destroy(); + GRPCDriverService.this.driverProcess =3D null; + } + } + } + }; + // This is probably overkill since shutdown should be called in the = stop handler. + Runtime.getRuntime().addShutdownHook(closeChildThread); + } + } + + private void stop() { + stop(null); + } + + private void stop(final Throwable t) { + LOG.log(Level.INFO, "STOP: gRPC Driver Service", t); + if (!stopped) { + try { + if (!clock.isClosed()) { + if (t !=3D null) { + clock.stop(t); + } else { + clock.stop(); + } + } + if (server !=3D null) { + LOG.log(Level.INFO, "Shutdown gRPC"); + this.server.shutdown(); + this.server =3D null; + } + if (this.driverProcess !=3D null) { + LOG.log(Level.INFO, "shutdown driver process"); + dump(); + this.driverProcess.destroy(); + this.driverProcess =3D null; + } + } finally { + LOG.log(Level.INFO, "COMPLETED STOP: gRPC Driver Service"); + stopped =3D true; + } + } + } + + private void dump() { + if (!driverProcessIsAlive()) { + LOG.log(Level.INFO, "Exit code: {0}", this.driverProcess.exitValue()= ); + } + dumpStream(StreamType.STDOUT); + dumpStream(StreamType.STDERR); + } + + private void dumpStream(final StreamType type) { + final StringBuilder stringBuilder =3D new StringBuilder(); + + final String name; + final InputStream stream; + switch(type) { + case STDOUT: + name =3D "stdout"; + stream =3D this.driverProcess.getInputStream(); + break; + case STDERR: + name =3D "stderr"; + stream =3D this.driverProcess.getErrorStream(); + break; + default: + throw new RuntimeException("Invalid stream type value"); + } + + LOG.log(Level.INFO, "capturing driver process {0}", name); + try { + stringBuilder.append("\n=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D\n"); + try (final BufferedReader reader =3D new BufferedReader(new InputStr= eamReader(stream))) { + while (reader.ready()) { + stringBuilder.append(reader.readLine()).append('\n'); + } + } + stringBuilder.append("\n=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D\n"); + } catch (final IOException e) { + LOG.log(Level.WARNING, "Error while capturing output stream", e); + } + LOG.log(Level.INFO, "{0}", stringBuilder); + } + + /** + * Determines if the driver process is still alive by + * testing for its exit value, which throws {@link IllegalThreadStateExc= eption} + * if process is still running. + * @return true if driver process is alive, false otherwise + */ + private boolean driverProcessIsAlive() { + if (this.driverProcess !=3D null) { + try { + this.driverProcess.exitValue(); + } catch (final IllegalThreadStateException e) { + return true; + } + } + return false; + } + + private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final Evaluato= rDescriptor descriptor) { + return descriptor =3D=3D null ? null : EvaluatorDescriptorInfo.newBuil= der() + .setCores(descriptor.getNumberOfCores()) + .setMemory(descriptor.getMemory()) + .setRuntimeName(descriptor.getRuntimeName()) + .build(); + } + + @Override + public IdleMessage getIdleStatus() { + final String componentName =3D "Java Bridge DriverService"; + if (this.clientStub !=3D null) { + try { + final IdleStatus idleStatus =3D this.clientStub.idlenessCheckHandl= er(VOID).get(); + LOG.log(Level.INFO, "is idle: {0}", idleStatus.getIsIdle()); + return new IdleMessage( + componentName, + idleStatus.getReason(), + idleStatus.getIsIdle()); + } catch (final ExecutionException | InterruptedException e) { + stop(e); + } + } + return new IdleMessage( + componentName, + "stub not initialized", + true); + } + + @Override + public void startHandler(final StartTime startTime) { + try { + start(); + synchronized (this) { + if (this.clientStub !=3D null) { + this.clientStub.startHandler( + StartTimeInfo.newBuilder().setStartTime(startTime.getTimesta= mp()).build()); + } else { + stop(new IllegalStateException("Unable to start driver client"))= ; + } + } + } catch (final IOException | InterruptedException e) { + stop(e); + } + } + + @Override + public void stopHandler(final StopTime stopTime) { + synchronized (this) { + if (clientStub !=3D null) { + final Future callCompletion =3D this.clientStub.sto= pHandler( + StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp())= .build()); + try { + final ExceptionInfo error =3D callCompletion.get(5L, TimeUnit.MI= NUTES); + if (!error.getNoError()) { + final Optional t =3D parseException(error); + if (t.isPresent()) { + throw new RuntimeException("driver stop exception", + t.get().getCause() !=3D null ? t.get().getCause() : t.ge= t()); + } else { + throw new RuntimeException(error.getMessage() !=3D null ? er= ror.getMessage() : error.getName()); + } + } + } catch (final TimeoutException e) { + throw new RuntimeException("stop handler timed out", e); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException("error in stop handler", e); + } finally { + stop(); + } + } + } + } + + @Override + public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.put(eval.getId(), eval); + this.clientStub.allocatedEvaluatorHandler( + EvaluatorInfo.newBuilder() + .setEvaluatorId(eval.getId()) + .setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluat= orDescriptor())) + .build()); + } + } + + @Override + public void completedEvaluatorHandler(final CompletedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.completedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build())= ; + } + } + + @Override + public void failedEvaluatorHandler(final FailedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.failedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build())= ; + } + } + + @Override + public void activeContextHandler(final ActiveContext context) { + synchronized (this) { + this.activeContextMap.put(context.getId(), context); + this.clientStub.activeContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentId().orElse("")) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void closedContextHandler(final ClosedContext context) { + synchronized (this) { + this.activeContextMap.remove(context.getId()); + this.clientStub.closedContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentContext().getId()) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void failedContextHandler(final FailedContext context) { + synchronized (this) { + final ContextInfo.Builder contextInfoBuilder =3D + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentContext().isPresent() ? + context.getParentContext().get().getId() : "") + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())); + if (context.getReason().isPresent()) { + final Throwable reason =3D context.getReason().get(); + contextInfoBuilder.setException(GRPCUtils.createExceptionInfo(this= .exceptionCodec, reason)); + } else if (context.getData().isPresent()) { + contextInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(context.toString()) + .setMessage(context.getDescription().orElse( + context.getMessage() !=3D null ? context.getMessage() : ""= )) + .setData(ByteString.copyFrom(context.getData().get())) + .build()); + } else { + contextInfoBuilder.setException(GRPCUtils.createExceptionInfo(this= .exceptionCodec, context.asError())); + } + this.activeContextMap.remove(context.getId()); + this.clientStub.failedContextHandler(contextInfoBuilder.build()); + } + } + + @Override + public void contextMessageHandler(final ContextMessage message) { + synchronized (this) { + this.clientStub.contextMessageHandler( + ContextMessageInfo.newBuilder() + .setContextId(message.getId()) + .setMessageSourceId(message.getMessageSourceID()) + .setSequenceNumber(message.getSequenceNumber()) + .setPayload(ByteString.copyFrom(message.get())) + .build()); + } + } + + @Override + public void runningTaskHandler(final RunningTask task) { + synchronized (this) { + final ActiveContext context =3D task.getActiveContext(); + if (!this.activeContextMap.containsKey(context.getId())) { + this.activeContextMap.put(context.getId(), context); + } + this.runningTaskMap.put(task.getId(), task); + this.clientStub.runningTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentId().orElse("")) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + task.getActiveContext().getEvaluatorDescriptor())) + .build()) + .build()); + } + } + + @Override + public void failedTaskHandler(final FailedTask task) { + synchronized (this) { + if (task.getActiveContext().isPresent() && + !this.activeContextMap.containsKey(task.getActiveContext().get()= .getId())) { + this.activeContextMap.put(task.getActiveContext().get().getId(), t= ask.getActiveContext().get()); + } + final TaskInfo.Builder taskInfoBuilder =3D TaskInfo.newBuilder() + .setTaskId(task.getId()); + if (task.getActiveContext().isPresent()) { + taskInfoBuilder.setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().get().getId()) + .setEvaluatorId(task.getActiveContext().get().getEvaluatorId()= ) + .setParentId(task.getActiveContext().get().getParentId().orEls= e("")) + .build()); + } + if (task.getReason().isPresent()) { + taskInfoBuilder.setException(GRPCUtils.createExceptionInfo(this.ex= ceptionCodec, task.getReason().get())); + } else if (task.getData().isPresent()) { + final Throwable reason =3D task.asError(); + taskInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(task.getMessage() !=3D null ? task.getMessage() : = "") + .setData(ByteString.copyFrom(task.getData().get())) + .build()); + } else { + taskInfoBuilder.setException(GRPCUtils.createExceptionInfo(this.ex= ceptionCodec, task.asError())); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.failedTaskHandler(taskInfoBuilder.build()); + } + } + + @Override + public void completedTaskHandler(final CompletedTask task) { + synchronized (this) { + if (!this.activeContextMap.containsKey(task.getActiveContext().getId= ())) { + this.activeContextMap.put(task.getActiveContext().getId(), task.ge= tActiveContext()); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.completedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().getId()) + .setEvaluatorId(task.getActiveContext().getEvaluatorId()= ) + .setParentId(task.getActiveContext().getParentId().orEls= e("")) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + task.getActiveContext().getEvaluatorDescriptor())) + .build()) + .build()); + } + } + + @Override + public void suspendedTaskHandler(final SuspendedTask task) { + synchronized (this) { + if (!this.activeContextMap.containsKey(task.getActiveContext().getId= ())) { + this.activeContextMap.put(task.getActiveContext().getId(), task.ge= tActiveContext()); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.suspendedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().getId()) + .setEvaluatorId(task.getActiveContext().getEvaluatorId()= ) + .setParentId(task.getActiveContext().getParentId().orEls= e("")) + .build()) + .setResult(task.get() =3D=3D null || task.get().length =3D= =3D 0 ? + null : ByteString.copyFrom(task.get())) + .build()); + } + } + + @Override + public void taskMessageHandler(final TaskMessage message) { + synchronized (this) { + this.clientStub.taskMessageHandler( + TaskMessageInfo.newBuilder() + .setTaskId(message.getId()) + .setContextId(message.getContextId()) + .setMessageSourceId(message.getMessageSourceID()) + .setSequenceNumber(message.getSequenceNumber()) + .setPayload(ByteString.copyFrom(message.get())) + .build()); + } + } + + @Override + public void clientMessageHandler(final byte[] message) { + synchronized (this) { + this.clientStub.clientMessageHandler( + ClientMessageInfo.newBuilder() + .setPayload(ByteString.copyFrom(message)) + .build()); + } + } + + @Override + public void clientCloseHandler() { + synchronized (this) { + this.clientStub.clientCloseHandler(VOID); + } + } + + @Override + public void clientCloseWithMessageHandler(final byte[] message) { + synchronized (this) { + this.clientStub.clientCloseWithMessageHandler( + ClientMessageInfo.newBuilder() + .setPayload(ByteString.copyFrom(message)) + .build()); + } + } + + @Override + public void driverRestarted(final DriverRestarted restart) { + try { + start(); + synchronized (this) { + if (this.clientStub !=3D null) { + this.clientStub.driverRestartHandler(DriverRestartInfo.newBuilde= r() + .setResubmissionAttempts(restart.getResubmissionAttempts()) + .setStartTime(StartTimeInfo.newBuilder() + .setStartTime(restart.getStartTime().getTimestamp()).bui= ld()) + .addAllExpectedEvaluatorIds(restart.getExpectedEvaluatorIds(= )) + .build()); + } else { + stop(new DriverClientException("Failed to restart driver client"= )); + } + } + } catch (final InterruptedException | IOException e) { + stop(e); + } + } + + @Override + public void restartRunningTask(final RunningTask task) { + synchronized (this) { + final ActiveContext context =3D task.getActiveContext(); + if (!this.activeContextMap.containsKey(context.getId())) { + this.activeContextMap.put(context.getId(), context); + } + this.runningTaskMap.put(task.getId(), task); + this.clientStub.driverRestartRunningTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentId().orElse("")) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(co= ntext.getEvaluatorDescriptor())) + .build()) + .build()); + } + } + + @Override + public void restartActiveContext(final ActiveContext context) { + synchronized (this) { + this.activeContextMap.put(context.getId(), context); + this.clientStub.driverRestartActiveContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentId().orElse("")) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void driverRestartCompleted(final DriverRestartCompleted restartC= ompleted) { + synchronized (this) { + this.clientStub.driverRestartCompletedHandler(DriverRestartCompleted= Info.newBuilder() + .setCompletionTime(StopTimeInfo.newBuilder() + .setStopTime(restartCompleted.getCompletedTime().getTimestam= p()).build()) + .setIsTimedOut(restartCompleted.isTimedOut()) + .build()); + } + } + + @Override + public void restartFailedEvalautor(final FailedEvaluator evaluator) { + synchronized (this) { + this.clientStub.driverRestartFailedEvaluatorHandler(EvaluatorInfo.ne= wBuilder() + .setEvaluatorId(evaluator.getId()) + .setFailure(EvaluatorInfo.FailureInfo.newBuilder() + .setMessage(evaluator.getEvaluatorException() !=3D null ? + evaluator.getEvaluatorException().getMessage() : "unknow= n failure during restart") + .build()) + .build()); + } + } + + private Optional parseException(final ExceptionInfo info) { + if (info.getData() =3D=3D null || info.getData().isEmpty()) { + return Optional.empty(); + } else { + return exceptionCodec.fromBytes(info.getData().toByteArray()); + } + } + + private final class DriverBridgeServiceImpl + extends DriverServiceGrpc.DriverServiceImplBase { + + @Override + public void registerDriverClient( + final DriverClientRegistration request, + final StreamObserver responseObserver) { + LOG.log(Level.INFO, "driver client register"); + try { + if (request.hasException()) { + LOG.log(Level.SEVERE, "Driver client initialization exception"); + final Optional ex =3D parseException(request.getExcep= tion()); + if (ex.isPresent()) { + GRPCDriverService.this.clock.stop(ex.get()); + } else { + GRPCDriverService.this.clock.stop(new RuntimeException( + request.getException().getMessage() =3D=3D null ? + request.getException().getName() : + request.getException().getMessage() + )); + } + } else { + final ManagedChannel channel =3D ManagedChannelBuilder + .forAddress(request.getHost(), request.getPort()) + .usePlaintext(true) + .build(); + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub =3D DriverClientGrpc.newFutu= reStub(channel); + GRPCDriverService.this.notifyAll(); + } + LOG.log(Level.INFO, "Driver has registered on port " + request.g= etPort()); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void requestResources( + final ResourceRequest request, + final StreamObserver responseObserver) { + try { + synchronized (GRPCDriverService.this) { + EvaluatorRequest.Builder requestBuilder =3D GRPCDriverService.th= is.evaluatorRequestor.newRequest(); + requestBuilder.setNumber(request.getResourceCount()); + requestBuilder.setNumberOfCores(request.getCores()); + requestBuilder.setMemory(request.getMemorySize()); + requestBuilder.setRelaxLocality(request.getRelaxLocality()); + requestBuilder.setRuntimeName(request.getRuntimeName()); + if (request.getNodeNameListCount() > 0) { + requestBuilder.addNodeNames(request.getNodeNameListList()); + } + if (request.getRackNameListCount() > 0) { + for (final String rackName : request.getRackNameListList()) { + requestBuilder.addRackName(rackName); + } + } + GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.= build()); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void shutdown( + final ShutdownRequest request, + final StreamObserver responseObserver) { + try { + LOG.log(Level.INFO, "driver shutdown"); + if (request.hasException()) { + final Optional exception =3D parseException(request.g= etException()); + if (exception.isPresent()) { + LOG.log(Level.INFO, "driver exception: " + exception.get().toS= tring()); + GRPCDriverService.this.clock.stop(exception.get()); + } else { + // exception that cannot be parsed in java + GRPCDriverService.this.clock.stop( + new DriverClientException(request.getException().getMessag= e())); + } + } else { + LOG.log(Level.INFO, "clean shutdown"); + GRPCDriverService.this.clock.stop(); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void setAlarm( + final AlarmRequest request, + final StreamObserver responseObserver) { + try { + // do not synchronize when scheduling an alarm (or deadlock) + LOG.log(Level.INFO, "Set alarm {0} offset {1}", + new Object[] {request.getAlarmId(), request.getTimeoutMs()}); + LOG.log(Level.INFO, "Alarm class " + GRPCDriverService.this.clock.= getClass()); + GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(),= new EventHandler() { + @Override + public void onNext(final Alarm value) { + LOG.log(Level.INFO, "Trigger alarm {0}", request.getAlarmId())= ; + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub.alarmTrigger( + AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlar= mId()).build()); + LOG.log(Level.INFO, "DONE: trigger alarm {0}", request.getAl= armId()); + } + } + }); + LOG.log(Level.INFO, "Alarm {0} scheduled is idle? {1}", + new Object[] {request.getAlarmId(), clock.isIdle()}); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void allocatedEvaluatorOp( + final AllocatedEvaluatorRequest request, + final StreamObserver responseObserver) { + try { + if (request.getEvaluatorConfiguration() =3D=3D null) { + responseObserver.onError(Status.INTERNAL + .withDescription("Evaluator configuration required") + .asRuntimeException()); + } else if (request.getContextConfiguration() =3D=3D null && reques= t.getTaskConfiguration() =3D=3D null) { + responseObserver.onError(Status.INTERNAL + .withDescription("Context and/or Task configuration required= ") + .asRuntimeException()); + } else { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.allocatedEvaluatorMap.containsKey(= request.getEvaluatorId())) { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown allocated evaluator " + reques= t.getEvaluatorId()) + .asRuntimeException()); + } + final AllocatedEvaluator evaluator =3D + GRPCDriverService.this.allocatedEvaluatorMap.get(request.g= etEvaluatorId()); + if (request.getCloseEvaluator()) { + evaluator.close(); + } else { + if (request.getAddFilesCount() > 0) { + for (final String file : request.getAddFilesList()) { + evaluator.addFile(new File(file)); + } + } + if (request.getAddLibrariesCount() > 0) { + for (final String library : request.getAddLibrariesList())= { + evaluator.addLibrary(new File(library)); + } + } + if (request.getSetProcess() !=3D null) { + final AllocatedEvaluatorRequest.EvaluatorProcessRequest pr= ocessRequest =3D + request.getSetProcess(); + switch (evaluator.getEvaluatorDescriptor().getProcess().ge= tType()) { + case JVM: + setJVMProcess(evaluator, processRequest); + break; + case CLR: + setCLRProcess(evaluator, processRequest); + break; + default: + throw new RuntimeException("Unknown evaluator process ty= pe"); + } + } + if (StringUtils.isEmpty(request.getEvaluatorConfiguration())= ) { + // Assume that we are running Java driver client, but this= assumption could be a bug so log a warning + LOG.log(Level.WARNING, "No evaluator configuration detecte= d. Assuming a Java driver client."); + if (StringUtils.isNotEmpty(request.getContextConfiguration= ()) && + StringUtils.isNotEmpty(request.getTaskConfiguration())= ) { + // submit context and task + try { + evaluator.submitContextAndTask( + configurationSerializer.fromString(request.getCont= extConfiguration()), + configurationSerializer.fromString(request.getTask= Configuration())); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } else if (StringUtils.isNotEmpty(request.getContextConfig= uration())) { + // submit context + try { + evaluator.submitContext(configurationSerializer.fromSt= ring(request.getContextConfiguration())); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } else if (StringUtils.isNotEmpty(request.getTaskConfigura= tion())) { + // submit task + try { + evaluator.submitTask(configurationSerializer.fromStrin= g(request.getTaskConfiguration())); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("Missing check for required e= valuator configurations"); + } + } else { + if (StringUtils.isNotEmpty(request.getContextConfiguration= ()) && + StringUtils.isNotEmpty(request.getTaskConfiguration())= ) { + // submit context and task + ((AllocatedEvaluatorImpl) evaluator).submitContextAndTas= k( + request.getEvaluatorConfiguration(), + request.getContextConfiguration(), + request.getTaskConfiguration()); + } else if (StringUtils.isNotEmpty(request.getContextConfig= uration())) { + // submit context + ((AllocatedEvaluatorImpl) evaluator).submitContext( + request.getEvaluatorConfiguration(), + request.getContextConfiguration()); + } else if (StringUtils.isNotEmpty(request.getTaskConfigura= tion())) { + // submit task + ((AllocatedEvaluatorImpl) evaluator).submitTask( + request.getEvaluatorConfiguration(), + request.getTaskConfiguration()); + } else { + throw new RuntimeException("Missing check for required e= valuator configurations"); + } + } + } + } + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void activeContextOp( + final ActiveContextRequest request, + final StreamObserver responseObserver) { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.activeContextMap.containsKey(request.g= etContextId())) { + LOG.log(Level.SEVERE, "Context does not exist with id " + reques= t.getContextId()); =20 Review comment: use string interpolation (here and everywhere) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > A new module containing the new Java bridge > ------------------------------------------- > > Key: REEF-2025 > URL: https://issues.apache.org/jira/browse/REEF-2025 > Project: REEF > Issue Type: Sub-task > Components: REEF Bridge > Affects Versions: 0.17 > Reporter: Tyson Condie > Assignee: Tyson Condie > Priority: Major > Fix For: 0.17 > > > This Jira introduces the module containing the new bridge.=C2=A0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)