Return-Path: X-Original-To: apmail-quarks-dev-archive@minotaur.apache.org Delivered-To: apmail-quarks-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EC856197B4 for ; Fri, 1 Apr 2016 14:55:29 +0000 (UTC) Received: (qmail 83433 invoked by uid 500); 1 Apr 2016 14:55:29 -0000 Delivered-To: apmail-quarks-dev-archive@quarks.apache.org Received: (qmail 83403 invoked by uid 500); 1 Apr 2016 14:55:29 -0000 Mailing-List: contact dev-help@quarks.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quarks.incubator.apache.org Delivered-To: mailing list dev@quarks.incubator.apache.org Received: (qmail 83392 invoked by uid 99); 1 Apr 2016 14:55:29 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 14:55:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 677C9C0225 for ; Fri, 1 Apr 2016 14:55:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id rTUTJKloW2-q for ; Fri, 1 Apr 2016 14:55:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 9EC645FB2E for ; Fri, 1 Apr 2016 14:55:26 +0000 (UTC) Received: (qmail 82795 invoked by uid 99); 1 Apr 2016 14:55:25 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 14:55:25 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 9132C2C033A for ; Fri, 1 Apr 2016 14:55:25 +0000 (UTC) Date: Fri, 1 Apr 2016 14:55:25 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@quarks.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (QUARKS-66) Job monitoring application which restarts failed jobs MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/QUARKS-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15221794#comment-15221794 ] ASF GitHub Bot commented on QUARKS-66: -------------------------------------- Github user ddebrunner commented on a diff in the pull request: https://github.com/apache/incubator-quarks/pull/59#discussion_r58217230 --- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java --- @@ -0,0 +1,204 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package quarks.apps.runtime; + +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonObject; + +import quarks.execution.DirectSubmitter; +import quarks.execution.Job; +import quarks.execution.services.ControlService; +import quarks.execution.services.RuntimeServices; +import quarks.execution.services.job.JobRegistryService; +import quarks.function.Consumer; +import quarks.function.Supplier; +import quarks.runtime.jobregistry.JobEvents; +import quarks.topology.TStream; +import quarks.topology.Topology; +import quarks.topology.TopologyProvider; +import quarks.topology.mbeans.ApplicationServiceMXBean; +import quarks.topology.plumbing.PlumbingStreams; +import quarks.topology.services.ApplicationService; + +/** + * Job monitoring application. + *

+ * The application listens on JobRegistry events and resubmits jobs for which + * an event has been emitted because the job is unhealthy. The monitored + * applications must be registered with an {@code ApplicationService} + * prior to submission, otherwise the monitor application cannot restart + * them.

+ *

+ * The monitoring application must be submitted within a context which + * provides the following services: + *

    + *
  • ApplicationService - an {@code ApplicationServiceMXBean} control + * registered by this service is used to resubmit failed applications.
  • + *
  • ControlService - the application queries this service for an + * {@code ApplicationServiceMXBean} control, which is then used for + * restarting failed applications.
  • + *
  • JobRegistryService - generates job monitoring events.
  • + *
+ *

+ */ +public class MonitorApp { + private final TopologyProvider provider; + private final DirectSubmitter submitter; + private final Topology topology; + private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class); + + /** + * Constructs a {@code MonitorApp} with the specified name in the + * context of the specified provider. + * + * @param provider the topology provider + * @param submitter a {@code DirectSubmitter} which provides required + * services and submits the application + * @param name the application name + * + * @throws IllegalArgumentException if the submitter does not provide + * access to the required services + */ + public MonitorApp(TopologyProvider provider, + DirectSubmitter submitter, String name) { + + this.provider = provider; + this.submitter = submitter; + validateSubmitter(); + this.topology = declareTopology(name); + } + + /** + * Submits the application topology. + * + * @return the job. + * @throws InterruptedException + * @throws ExecutionException + */ + public Job submit() throws InterruptedException, ExecutionException { + Future f = submitter.submit(topology); + return f.get(); + } + + /** + * Submits an application using an {@code ApplicationServiceMXBean} control + * registered with the specified {@code ControlService}. + * + * @param applicationName the name of the application to submit + * @param controlService the control service + */ + public static void submitApplication(String applicationName, ControlService controlService) { + try { + Set controls = + controlService.getControls(ApplicationServiceMXBean.class); + if (controls.isEmpty()) { + throw new IllegalStateException( + "Could not find a registered control with the following interface: " + + ApplicationServiceMXBean.class.getName()); + } + for (ApplicationServiceMXBean control : controls) +// TODO add ability to submit with the initial application configuration + control.submit(applicationName, null); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Declares the following topology: + *
    +     * JobEvents source --> Filter (health == unhealthy) --> Restart application
    +     * 
+ * + * @param name the topology name + * @return the application topology + */ + protected Topology declareTopology(String name) { + Topology t = provider.newTopology(name); + TStream jobEvents = JobEvents.source( + t, + (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); } + ); + jobEvents = PlumbingStreams.isolate(jobEvents, true); --- End diff -- > Topology.source() and that defines the stream is isolated Not sure what that means? How does it define it as isolated? > Job monitoring application which restarts failed jobs > ----------------------------------------------------- > > Key: QUARKS-66 > URL: https://issues.apache.org/jira/browse/QUARKS-66 > Project: Quarks > Issue Type: Task > Reporter: Victor Dogaru > Assignee: Victor Dogaru > Labels: failure-recovery > > An application which filters job events indicating jobs which closed with an unhealthy state and resubmits applications associated with those jobs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)