From dev-return-16557-archive-asf-public=cust-asf.ponee.io@reef.apache.org Tue Jun 5 22:42:09 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BEC1A18077F for ; Tue, 5 Jun 2018 22:42:07 +0200 (CEST) Received: (qmail 33248 invoked by uid 500); 5 Jun 2018 20:42:06 -0000 Mailing-List: contact dev-help@reef.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@reef.apache.org Delivered-To: mailing list dev@reef.apache.org Received: (qmail 33237 invoked by uid 99); 5 Jun 2018 20:42:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jun 2018 20:42:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 66E071806B9 for ; Tue, 5 Jun 2018 20:42:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.511 X-Spam-Level: X-Spam-Status: No, score=-109.511 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id qPFVLWEnzDgX for ; Tue, 5 Jun 2018 20:42:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id AEF5E5F4A9 for ; Tue, 5 Jun 2018 20:42:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id BB37EE0E87 for ; Tue, 5 Jun 2018 20:42:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 30B4E2109E for ; Tue, 5 Jun 2018 20:42:01 +0000 (UTC) Date: Tue, 5 Jun 2018 20:42:01 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@reef.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (REEF-2025) A new module containing the new Java bridge MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/REEF-2025?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16502= 460#comment-16502460 ]=20 ASF GitHub Bot commented on REEF-2025: -------------------------------------- motus commented on a change in pull request #1466: [REEF-2025] A new module= containing the new Java bridge URL: https://github.com/apache/reef/pull/1466#discussion_r193214382 =20 =20 ########## File path: lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/= bridge/driver/client/grpc/DriverClientService.java ########## @@ -0,0 +1,663 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.grpc; + +import com.google.common.collect.Lists; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.driver.client.DriverClientDispatcher; +import org.apache.reef.bridge.driver.client.IDriverClientService; +import org.apache.reef.bridge.driver.client.JVMClientProcess; +import org.apache.reef.bridge.driver.client.events.*; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptor= Impl; +import org.apache.reef.runtime.common.utils.ExceptionCodec; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.Time; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The driver client service that accepts incoming messages driver service= and + * dispatches appropriate objects to the application. + */ +public final class DriverClientService extends DriverClientGrpc.DriverClie= ntImplBase + implements IDriverClientService { + + private static final Logger LOG =3D Logger.getLogger(DriverClientService= .class.getName()); + + private Server server; + + private final Object lock =3D new Object(); + + private final InjectionFuture clock; + + private final ExceptionCodec exceptionCodec; + + private final DriverServiceClient driverServiceClient; + + private final TcpPortProvider tcpPortProvider; + + private final InjectionFuture clientDriverDispat= cher; + + private final Map evaluatorBridgeMap = =3D new HashMap<>(); + + private final Map activeContextBridgeMap = =3D new HashMap<>(); + + private int outstandingEvaluatorCount =3D 0; + + @Inject + private DriverClientService( + final ExceptionCodec exceptionCodec, + final DriverServiceClient driverServiceClient, + final TcpPortProvider tcpPortProvider, + final InjectionFuture clock, + final InjectionFuture clientDriverDispatcher= ) { + this.exceptionCodec =3D exceptionCodec; + this.driverServiceClient =3D driverServiceClient; + this.tcpPortProvider =3D tcpPortProvider; + this.clock =3D clock; + this.clientDriverDispatcher =3D clientDriverDispatcher; + } + + @Override + public void notifyEvaluatorRequest(final int count) { + synchronized (this.lock) { + this.outstandingEvaluatorCount +=3D count; + this.lock.notify(); + } + } + + @Override + public void start() throws IOException { + for (final Integer port : this.tcpPortProvider) { + try { + this.server =3D ServerBuilder.forPort(port) + .addService(this) + .build() + .start(); + LOG.info("Driver Client Server started, listening on " + port); + break; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server =3D=3D null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } + this.driverServiceClient.registerDriverClientService("localhost", this= .server.getPort()); + } + + @Override + public void awaitTermination() throws InterruptedException { + if (this.server !=3D null) { + this.server.awaitTermination(); + } + } + + @Override + public void idlenessCheckHandler(final Void request, final StreamObserve= r responseObserver) { + if (isIdle()) { + LOG.log(Level.INFO, "possibly idle. waiting for some action."); + try { + synchronized (this.lock) { + this.lock.wait(1000); // wait a second + } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, e.getMessage()); + } + } + responseObserver.onNext(IdleStatus.newBuilder() + .setReason("DriverClient checking idleness") + .setIsIdle(this.isIdle()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void startHandler(final StartTimeInfo request, final StreamObserv= er responseObserver) { + try { + LOG.log(Level.INFO, "StartHandler at time {0}", request.getStartTime= ()); + final StartTime startTime =3D new StartTime(request.getStartTime()); + this.clientDriverDispatcher.get().dispatch(startTime); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void stopHandler(final StopTimeInfo request, final StreamObserver= responseObserver) { + try { + LOG.log(Level.INFO, "StopHandler at time {0}", request.getStopTime()= ); + final StopTime stopTime =3D new StopTime(request.getStopTime()); + final Throwable error =3D this.clientDriverDispatcher.get().dispatch= (stopTime); + if (error !=3D null) { + responseObserver.onNext(GRPCUtils.createExceptionInfo(this.excepti= onCodec, error)); + } else { + responseObserver.onNext(ExceptionInfo.newBuilder().setNoError(true= ).build()); + } + } finally { + responseObserver.onCompleted(); + this.server.shutdown(); + } + } + + @Override + public void alarmTrigger(final AlarmTriggerInfo request, final StreamObs= erver responseObserver) { + try { + LOG.log(Level.INFO, "Alarm Trigger id {0}", request.getAlarmId()); + this.clientDriverDispatcher.get().dispatchAlarm(request.getAlarmId()= ); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void allocatedEvaluatorHandler(final EvaluatorInfo request, final= StreamObserver responseObserver) { + try { + synchronized (this.lock) { + this.outstandingEvaluatorCount--; + } + LOG.log(Level.INFO, "Allocated evaluator id {0}", request.getEvaluat= orId()); + final AllocatedEvaluatorBridge eval =3D new AllocatedEvaluatorBridge= ( + request.getEvaluatorId(), + toEvaluatorDescriptor(request.getDescriptorInfo()), + this.driverServiceClient); + this.evaluatorBridgeMap.put(eval.getId(), eval); + this.clientDriverDispatcher.get().dispatch(eval); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void completedEvaluatorHandler(final EvaluatorInfo request, final= StreamObserver responseObserver) { + try { + LOG.log(Level.INFO, "Completed Evaluator id {0}", request.getEvaluat= orId()); + this.evaluatorBridgeMap.remove(request.getEvaluatorId()); + this.clientDriverDispatcher.get().dispatch(new CompletedEvaluatorBri= dge(request.getEvaluatorId())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void failedEvaluatorHandler(final EvaluatorInfo request, final St= reamObserver responseObserver) { + try { + if (!this.evaluatorBridgeMap.containsKey(request.getEvaluatorId())) = { + LOG.log(Level.INFO, "Failed evalautor that we were not allocated")= ; + synchronized (this.lock) { + if (this.outstandingEvaluatorCount > 0) { + this.outstandingEvaluatorCount--; + } + } + return; + } + LOG.log(Level.INFO, "Failed Evaluator id {0}", request.getEvaluatorI= d()); + final AllocatedEvaluatorBridge eval =3D this.evaluatorBridgeMap.remo= ve(request.getEvaluatorId()); + List failedContextList =3D new ArrayList<>(); + if (request.getFailure().getFailedContextsList() !=3D null) { + for (final String failedContextId : request.getFailure().getFailed= ContextsList()) { + final ActiveContextBridge context =3D this.activeContextBridgeMa= p.get(failedContextId); + failedContextList.add(new FailedContextBridge( + context.getId(), + eval.getId(), + request.getFailure().getMessage(), + eval.getEvaluatorDescriptor(), + context.getParentId().isPresent() ? + Optional.of(this.activeContextBridgeMap.g= et(context.getParentId().get())) : + Optional.empty(), + Optional.empty())); + } + for (final String failedContextId : request.getFailure().getFailed= ContextsList()) { + this.activeContextBridgeMap.remove(failedContextId); + } + } + this.clientDriverDispatcher.get().dispatch( + new FailedEvaluatorBridge( + eval.getId(), + new EvaluatorException(request.getEvaluatorId(), request.get= Failure().getMessage()), + failedContextList, + request.getFailure().getFailedTaskId() !=3D null ? + Optional.of(new FailedTask( + request.getFailure().getFailedTaskId(), + request.getFailure().getMessage(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty())) : + Optional.empty())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void activeContextHandler(final ContextInfo request, final Stream= Observer responseObserver) { + try { + LOG.log(Level.INFO, "Active context id {0}", request.getContextId())= ; + final AllocatedEvaluatorBridge eval =3D this.evaluatorBridgeMap.get(= request.getEvaluatorId()); + final ActiveContextBridge context =3D new ActiveContextBridge( + this.driverServiceClient, + request.getContextId(), + request.getParentId() !=3D null ? Optional.of(request.getParentI= d()) : Optional.empty(), + eval.getId(), + eval.getEvaluatorDescriptor()); + this.activeContextBridgeMap.put(context.getId(), context); + this.clientDriverDispatcher.get().dispatch(context); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void closedContextHandler(final ContextInfo request, final Stream= Observer responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Closed context id {0}", request.getContextId())= ; + try { + final ActiveContextBridge context =3D this.activeContextBridgeMap.= remove(request.getContextId()); + this.clientDriverDispatcher.get().dispatch( + new ClosedContextBridge( + context.getId(), + context.getEvaluatorId(), + this.activeContextBridgeMap.get(request.getParentId()), + context.getEvaluatorDescriptor())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id " + request.getContextId() = + " in close") + .asRuntimeException()); + } + } + + @Override + public void failedContextHandler(final ContextInfo request, final Stream= Observer responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Failed context id {0}", request.getContextId())= ; + try { + final ActiveContextBridge context =3D this.activeContextBridgeMap.= remove(request.getContextId()); + final Optional parent =3D context.getParentId().isP= resent() ? + Optional.of(this.activeContextBridgeMap.get(con= text.getParentId().get())) : + Optional.empty(); + final Optional reason =3D !request.getException().getDa= ta().isEmpty() ? + this.exceptionCodec.fromBytes(request.getException().getData()= .toByteArray()) : + Optional.empty(); + this.clientDriverDispatcher.get().dispatch( + new FailedContextBridge( + context.getId(), + context.getEvaluatorId(), + request.getException().getMessage(), + context.getEvaluatorDescriptor(), + parent, + reason)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id " + request.getContextId() = + " in close") + .asRuntimeException()); + } + } + + @Override + public void contextMessageHandler(final ContextMessageInfo request, fina= l StreamObserver responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Message context id {0}", request.getContextId()= ); + try { + this.clientDriverDispatcher.get().dispatch( + new ContextMessageBridge( + request.getContextId(), + request.getMessageSourceId(), + request.getSequenceNumber(), + request.getPayload().toByteArray())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id " + request.getContextId() = + " in close") + .asRuntimeException()); + } + } + + @Override + public void runningTaskHandler(final TaskInfo request, final StreamObser= ver responseObserver) { + final ContextInfo contextInfo =3D request.getContext(); + if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId(= ))) { + this.activeContextBridgeMap.put(contextInfo.getContextId(), toActive= Context(contextInfo)); + } + + LOG.log(Level.INFO, "Running task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context =3D this.activeContextBridgeMap.ge= t(contextInfo.getContextId()); + this.clientDriverDispatcher.get().dispatch( + new RunningTaskBridge(this.driverServiceClient, request.getTaskI= d(), context)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void failedTaskHandler(final TaskInfo request, final StreamObserv= er responseObserver) { + if (request.hasContext() && !this.activeContextBridgeMap.containsKey(r= equest.getContext().getContextId())) { + this.activeContextBridgeMap.put(request.getContext().getContextId(),= toActiveContext(request.getContext())); + } + try { + LOG.log(Level.INFO, "Failed task id {0}", request.getTaskId()); + final Optional context =3D + this.activeContextBridgeMap.containsKey(request.getContext().get= ContextId()) ? + Optional.of(this.activeContextBridgeMap.get(r= equest.getContext().getContextId())) : + Optional.empty(); + this.clientDriverDispatcher.get().dispatch( + new FailedTask( + request.getTaskId(), + request.getException().getMessage(), + Optional.of(request.getException().getName()), + request.getException().getData().isEmpty() ? + Optional.of(new EvaluatorException(request.ge= tException().getMessage())) : + this.exceptionCodec.fromBytes(request.getException().get= Data().toByteArray()), + Optional.empty(), + context)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void completedTaskHandler(final TaskInfo request, final StreamObs= erver responseObserver) { + final ContextInfo contextInfo =3D request.getContext(); + if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId(= ))) { + this.activeContextBridgeMap.put(contextInfo.getContextId(), toActive= Context(contextInfo)); + } + LOG.log(Level.INFO, "Completed task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context =3D this.activeContextBridgeMap.ge= t(request.getContext().getContextId()); + this.clientDriverDispatcher.get().dispatch( + new CompletedTaskBridge( + request.getTaskId(), + context, + request.getResult() !=3D null && !request.getResult().isEmpt= y() ? + request.getResult().toByteArray() : null)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void suspendedTaskHandler(final TaskInfo request, final StreamObs= erver responseObserver) { + final ContextInfo contextInfo =3D request.getContext(); + if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId(= ))) { + this.activeContextBridgeMap.put(contextInfo.getContextId(), toActive= Context(contextInfo)); + } + LOG.log(Level.INFO, "Suspended task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context =3D this.activeContextBridgeMap.ge= t(request.getContext().getContextId()); + this.clientDriverDispatcher.get().dispatch( + new SuspendedTaskBridge( + request.getTaskId(), + context, + request.getResult() !=3D null && !request.getResult().isEmpt= y() ? + request.getResult().toByteArray() : null)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void taskMessageHandler(final TaskMessageInfo request, final Stre= amObserver responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Message task id {0}", request.getTaskId()); + try { + this.clientDriverDispatcher.get().dispatch( + new TaskMessageBridge( + request.getTaskId(), + request.getContextId(), + request.getMessageSourceId(), + request.getSequenceNumber(), + request.getPayload().toByteArray())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown context id: " + request.getContextId()= ) + .asRuntimeException()); + } + } + + @Override + public void clientMessageHandler(final ClientMessageInfo request, final = StreamObserver responseObserver) { + LOG.log(Level.INFO, "Client message"); + try { + this.clientDriverDispatcher.get().clientMessageDispatch(request.getP= ayload().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void clientCloseHandler(final Void request, final StreamObserver<= Void> responseObserver) { + LOG.log(Level.INFO, "Client close"); + try { + this.clientDriverDispatcher.get().clientCloseDispatch(); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void clientCloseWithMessageHandler( + final ClientMessageInfo request, + final StreamObserver responseObserver) { + LOG.log(Level.INFO, "Client close with message"); + try { + this.clientDriverDispatcher.get().clientCloseWithMessageDispatch(req= uest.getPayload().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartHandler(final DriverRestartInfo request, final = StreamObserver responseObserver) { + LOG.log(Level.INFO, "Driver restarted"); + try { + final DriverRestarted driverRestarted =3D new DriverRestarted() { + @Override + public int getResubmissionAttempts() { + return request.getResubmissionAttempts(); + } + + @Override + public StartTime getStartTime() { + return new StartTime(request.getStartTime().getStartTime()); + } + + @Override + public Set getExpectedEvaluatorIds() { + return new HashSet<>(request.getExpectedEvaluatorIdsList()); + } + }; + this.clientDriverDispatcher.get().dispatchRestart(driverRestarted); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartActiveContextHandler( + final ContextInfo request, + final StreamObserver responseObserver) { + try { + LOG.log(Level.INFO, "Driver restarted active context " + request.get= ContextId()); + if (!this.evaluatorBridgeMap.containsKey(request.getEvaluatorId())) = { + final AllocatedEvaluatorBridge eval =3D new AllocatedEvaluatorBrid= ge( + request.getEvaluatorId(), + toEvaluatorDescriptor(request.getEvaluatorDescriptorInfo()), + this.driverServiceClient); + this.evaluatorBridgeMap.put(eval.getId(), eval); + } + final ActiveContextBridge context =3D toActiveContext(request); + this.activeContextBridgeMap.put(context.getId(), context); + this.clientDriverDispatcher.get().dispatchRestart(context); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartRunningTaskHandler( + final TaskInfo request, + final StreamObserver responseObserver) { + try { + LOG.log(Level.INFO, "Driver restarted running task " + request.getTa= skId()); + if (!this.evaluatorBridgeMap.containsKey(request.getContext().getEva= luatorId())) { + final AllocatedEvaluatorBridge eval =3D new AllocatedEvaluatorBrid= ge( + request.getContext().getEvaluatorId(), + toEvaluatorDescriptor(request.getContext().getEvaluatorDescrip= torInfo()), + this.driverServiceClient); + this.evaluatorBridgeMap.put(eval.getId(), eval); + } + if (!this.activeContextBridgeMap.containsKey(request.getContext().ge= tContextId())) { + final ActiveContextBridge context =3D toActiveContext(request.getC= ontext()); + this.activeContextBridgeMap.put(context.getId(), context); + } + final ActiveContextBridge context =3D this.activeContextBridgeMap.ge= t(request.getContext().getContextId()); + this.clientDriverDispatcher.get().dispatchRestart( + new RunningTaskBridge(this.driverServiceClient, request.getTaskI= d(), context)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartCompletedHandler( + final DriverRestartCompletedInfo request, + final StreamObserver responseObserver) { + try { + this.clientDriverDispatcher.get().dispatchRestart(new DriverRestartC= ompleted() { + @Override + public Time getCompletedTime() { + return new StopTime(request.getCompletionTime().getStopTime()); + } + + @Override + public boolean isTimedOut() { + return request.getIsTimedOut(); + } + }); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void driverRestartFailedEvaluatorHandler( + final EvaluatorInfo request, + final StreamObserver responseObserver) { + try { + this.clientDriverDispatcher.get().dispatchRestart(new FailedEvaluato= rBridge( + request.getEvaluatorId(), + request.getFailure() !=3D null ? + new EvaluatorException(request.getFailure().getMessage()) : + new EvaluatorException("restart failed"), + Lists.newArrayList(), + Optional.empty())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + // Helper methods + private boolean isIdle() { + LOG.log(Level.INFO, "Clock idle {0}, outstanding evaluators {1}, curre= nt evaluators {2}", + new Object[] { + this.clock.get().isIdle(), + this.outstandingEvaluatorCount, + this.evaluatorBridgeMap.isEmpty()}); + return clock.get().isIdle() && + this.outstandingEvaluatorCount =3D=3D 0 && + this.evaluatorBridgeMap.isEmpty(); + } + + private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescrip= torInfo info) { + return new EvaluatorDescriptorImpl( + null, + info.getMemory(), + info.getCores(), + new JVMClientProcess(), + info.getRuntimeName()); + } + + private ActiveContextBridge toActiveContext(final ContextInfo contextInf= o) { + final AllocatedEvaluatorBridge eval =3D this.evaluatorBridgeMap.get(co= ntextInfo.getEvaluatorId()); + return new ActiveContextBridge( + this.driverServiceClient, + contextInfo.getContextId(), + StringUtils.isNotEmpty(contextInfo.getParentId()) ? + Optional.of(contextInfo.getParentId()) : Optional.empt= y(), =20 Review comment: We can probably have a special method for strings, say `Optional.ofStrin= g()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > A new module containing the new Java bridge > ------------------------------------------- > > Key: REEF-2025 > URL: https://issues.apache.org/jira/browse/REEF-2025 > Project: REEF > Issue Type: Sub-task > Components: REEF Bridge > Affects Versions: 0.17 > Reporter: Tyson Condie > Assignee: Tyson Condie > Priority: Major > Fix For: 0.17 > > > This Jira introduces the module containing the new bridge.=C2=A0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)