reef-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (REEF-2025) A new module containing the new Java bridge
Date Tue, 05 Jun 2018 02:12:01 GMT

    [ https://issues.apache.org/jira/browse/REEF-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501173#comment-16501173
] 

ASF GitHub Bot commented on REEF-2025:
--------------------------------------

motus commented on a change in pull request #1466: [REEF-2025] A new module containing the
new Java bridge
URL: https://github.com/apache/reef/pull/1466#discussion_r192536047
 
 

 ##########
 File path: lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java
 ##########
 @@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.bridge.driver.client;
+
+import com.google.common.collect.Sets;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.bridge.driver.client.parameters.DriverClientDispatchThreadCount;
+import org.apache.reef.bridge.driver.client.parameters.ClientDriverStopHandler;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.utils.DispatchingEStage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * Async dispatch of client driver events.
+ */
+@Private
+public final class DriverClientDispatcher {
+
+  /**
+   * Exception handler.
+   */
+  private final DriverClientExceptionHandler exceptionHandler;
+
+  /**
+   * Dispatcher used for application provided event handlers.
+   */
+  private final DispatchingEStage applicationDispatcher;
+
+  /**
+   * Dispatcher for client close events.
+   */
+  private final DispatchingEStage clientCloseDispatcher;
+
+  /**
+   * Dispatcher for client close with message events.
+   */
+  private final DispatchingEStage clientCloseWithMessageDispatcher;
+
+  /**
+   * Dispatcher for client messages.
+   */
+  private final DispatchingEStage clientMessageDispatcher;
+
+  /**
+   * The alarm dispatcher.
+   */
+  private final DispatchingEStage alarmDispatcher;
+
+  /**
+   * Driver restart dispatcher.
+   */
+  private final DispatchingEStage driverRestartDispatcher;
+
+
+  /**
+   * Synchronous set of stop handlers.
+   */
+  private final Set<EventHandler<StopTime>> stopHandlers;
+
+  @Inject
+  private DriverClientDispatcher(
+      final DriverClientExceptionHandler driverExceptionHandler,
+      final IAlarmDispatchHandler alarmDispatchHandler,
+      @Parameter(DriverClientDispatchThreadCount.class)
+      final Integer numberOfThreads,
+      // Application-provided start and stop handlers
+      @Parameter(DriverStartHandler.class)
+      final Set<EventHandler<StartTime>> startHandlers,
+      @Parameter(ClientDriverStopHandler.class)
+      final Set<EventHandler<StopTime>> stopHandlers,
+      // Application-provided Context event handlers
+      @Parameter(ContextActiveHandlers.class)
+      final Set<EventHandler<ActiveContext>> contextActiveHandlers,
+      @Parameter(ContextClosedHandlers.class)
+      final Set<EventHandler<ClosedContext>> contextClosedHandlers,
+      @Parameter(ContextFailedHandlers.class)
+      final Set<EventHandler<FailedContext>> contextFailedHandlers,
+      @Parameter(ContextMessageHandlers.class)
+      final Set<EventHandler<ContextMessage>> contextMessageHandlers,
+      // Application-provided Task event handlers
+      @Parameter(TaskRunningHandlers.class)
+      final Set<EventHandler<RunningTask>> taskRunningHandlers,
+      @Parameter(TaskCompletedHandlers.class)
+      final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+      @Parameter(TaskSuspendedHandlers.class)
+      final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+      @Parameter(TaskMessageHandlers.class)
+      final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+      @Parameter(TaskFailedHandlers.class)
+      final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+      // Application-provided Evaluator event handlers
+      @Parameter(EvaluatorAllocatedHandlers.class)
+      final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
+      @Parameter(EvaluatorFailedHandlers.class)
+      final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+      @Parameter(EvaluatorCompletedHandlers.class)
+      final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+      // Client handlers
+      @Parameter(ClientCloseHandlers.class)
+      final Set<EventHandler<Void>> clientCloseHandlers,
+      @Parameter(ClientCloseWithMessageHandlers.class)
+      final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
+      @Parameter(ClientMessageHandlers.class)
+      final Set<EventHandler<byte[]>> clientMessageHandlers) {
+    this.exceptionHandler = driverExceptionHandler;
+    this.applicationDispatcher = new DispatchingEStage(
+        driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher");
+    // Application start and stop handlers
+    this.applicationDispatcher.register(StartTime.class, startHandlers);
+    this.stopHandlers = stopHandlers; // must be called synchronously
+    // Application Context event handlers
+    this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
+    this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
+    this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
+    this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
+
+    // Application Task event handlers.
+    this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
+    this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
+    this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
+    this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
+    this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
+
+    // Application Evaluator event handlers
+    this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
+    this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
+    this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
+
+    // Client event handlers;
+    this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher);
+    this.clientCloseDispatcher.register(Void.class, clientCloseHandlers);
+
+    this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
+    this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers);
+
+    this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
+    this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers);
+
+    // Alarm event handlers
+    this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher);
+    this.alarmDispatcher.register(String.class,
+        Sets.newHashSet((EventHandler<String>)alarmDispatchHandler));
+
+    // Driver restart dispatcher
+    this.driverRestartDispatcher = new DispatchingEStage(this.applicationDispatcher);
+  }
+
+  @Inject
+  private DriverClientDispatcher(
+      final DriverClientExceptionHandler driverExceptionHandler,
+      final IAlarmDispatchHandler alarmDispatchHandler,
+      @Parameter(DriverClientDispatchThreadCount.class)
+      final Integer numberOfThreads,
+      // Application-provided start and stop handlers
+      @Parameter(DriverStartHandler.class)
+      final Set<EventHandler<StartTime>> startHandlers,
+      @Parameter(ClientDriverStopHandler.class)
+      final Set<EventHandler<StopTime>> stopHandlers,
+      // Application-provided Context event handlers
+      @Parameter(ContextActiveHandlers.class)
+      final Set<EventHandler<ActiveContext>> contextActiveHandlers,
+      @Parameter(ContextClosedHandlers.class)
+      final Set<EventHandler<ClosedContext>> contextClosedHandlers,
+      @Parameter(ContextFailedHandlers.class)
+      final Set<EventHandler<FailedContext>> contextFailedHandlers,
+      @Parameter(ContextMessageHandlers.class)
+      final Set<EventHandler<ContextMessage>> contextMessageHandlers,
+      // Application-provided Task event handlers
+      @Parameter(TaskRunningHandlers.class)
+      final Set<EventHandler<RunningTask>> taskRunningHandlers,
+      @Parameter(TaskCompletedHandlers.class)
+      final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+      @Parameter(TaskSuspendedHandlers.class)
+      final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+      @Parameter(TaskMessageHandlers.class)
+      final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+      @Parameter(TaskFailedHandlers.class)
+      final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+      // Application-provided Evaluator event handlers
+      @Parameter(EvaluatorAllocatedHandlers.class)
+      final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
+      @Parameter(EvaluatorFailedHandlers.class)
+      final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+      @Parameter(EvaluatorCompletedHandlers.class)
+      final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+      // Client handlers
+      @Parameter(ClientCloseHandlers.class)
+      final Set<EventHandler<Void>> clientCloseHandlers,
+      @Parameter(ClientCloseWithMessageHandlers.class)
+      final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
+      @Parameter(ClientMessageHandlers.class)
+      final Set<EventHandler<byte[]>> clientMessageHandlers,
+      // Driver restart handlers
+      @Parameter(DriverRestartHandler.class)
+      final Set<EventHandler<DriverRestarted>> driverRestartHandlers,
+      @Parameter(DriverRestartTaskRunningHandlers.class)
+      final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
+      @Parameter(DriverRestartContextActiveHandlers.class)
+      final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
+      @Parameter(DriverRestartCompletedHandlers.class)
+      final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
+      @Parameter(DriverRestartFailedEvaluatorHandlers.class)
+      final Set<EventHandler<FailedEvaluator>> driverRestartFailedEvaluatorHandlers)
{
+    this(
+        driverExceptionHandler,
+        alarmDispatchHandler,
+        numberOfThreads,
+        startHandlers,
+        stopHandlers,
+        contextActiveHandlers,
+        contextClosedHandlers,
+        contextFailedHandlers,
+        contextMessageHandlers,
+        taskRunningHandlers,
+        taskCompletedHandlers,
+        taskSuspendedHandlers,
+        taskMessageEventHandlers,
+        taskExceptionEventHandlers,
+        evaluatorAllocatedHandlers,
+        evaluatorFailedHandlers,
+        evaluatorCompletedHandlers,
+        clientCloseHandlers,
+        clientCloseWithMessageHandlers,
+        clientMessageHandlers);
+    // Register driver restart handlers.
+    this.driverRestartDispatcher.register(DriverRestarted.class, driverRestartHandlers);
+    this.driverRestartDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
+    this.driverRestartDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
+    this.driverRestartDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers);
+    this.driverRestartDispatcher.register(FailedEvaluator.class, driverRestartFailedEvaluatorHandlers);
+  }
+
+  public void dispatchRestart(final DriverRestarted driverRestarted) {
+    this.driverRestartDispatcher.onNext(DriverRestarted.class, driverRestarted);
+  }
+
+  public void dispatchRestart(final RunningTask task) {
+    this.driverRestartDispatcher.onNext(RunningTask.class, task);
+  }
+
+  public void dispatchRestart(final ActiveContext context) {
+    this.driverRestartDispatcher.onNext(ActiveContext.class, context);
+  }
+
+  public void dispatchRestart(final DriverRestartCompleted completed) {
+    this.driverRestartDispatcher.onNext(DriverRestartCompleted.class, completed);
+  }
+
+  public void dispatchRestart(final FailedEvaluator evaluator) {
+    this.driverRestartDispatcher.onNext(FailedEvaluator.class, evaluator);
+  }
+
+  public void dispatch(final StartTime startTime) {
+    this.applicationDispatcher.onNext(StartTime.class, startTime);
+  }
+
+  /**
+   * We must implement this synchronously in order to catch exceptions and
+   * forward them back via the bridge before the server shuts down, after
+   * this method returns.
+   * @param stopTime stop time
+   */
+  @SuppressWarnings("checkstyle:illegalCatch")
+  public Throwable dispatch(final StopTime stopTime) {
+    try {
+      for (final EventHandler<StopTime> handler : stopHandlers) {
+        handler.onNext(stopTime);
+      }
+      return null;
+    } catch (Throwable t) {
+      return t;
+    }
+  }
 
 Review comment:
   makes sense

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> A new module containing the new Java bridge
> -------------------------------------------
>
>                 Key: REEF-2025
>                 URL: https://issues.apache.org/jira/browse/REEF-2025
>             Project: REEF
>          Issue Type: Sub-task
>          Components: REEF Bridge
>    Affects Versions: 0.17
>            Reporter: Tyson Condie
>            Assignee: Tyson Condie
>            Priority: Major
>             Fix For: 0.17
>
>
> This Jira introduces the module containing the new bridge. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message