Return-Path:
X-Original-To: apmail-quarks-dev-archive@minotaur.apache.org
Delivered-To: apmail-quarks-dev-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id EC856197B4
for ;
Fri, 1 Apr 2016 14:55:29 +0000 (UTC)
Received: (qmail 83433 invoked by uid 500); 1 Apr 2016 14:55:29 -0000
Delivered-To: apmail-quarks-dev-archive@quarks.apache.org
Received: (qmail 83403 invoked by uid 500); 1 Apr 2016 14:55:29 -0000
Mailing-List: contact dev-help@quarks.incubator.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@quarks.incubator.apache.org
Delivered-To: mailing list dev@quarks.incubator.apache.org
Received: (qmail 83392 invoked by uid 99); 1 Apr 2016 14:55:29 -0000
Received: from pnap-us-west-generic-nat.apache.org (HELO
spamd4-us-west.apache.org) (209.188.14.142)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 14:55:29 +0000
Received: from localhost (localhost [127.0.0.1])
by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org)
with ESMTP id 677C9C0225
for ; Fri, 1 Apr 2016 14:55:29 +0000 (UTC)
X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org
X-Spam-Flag: NO
X-Spam-Score: -4.021
X-Spam-Level:
X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31
tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5,
RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01,
RP_MATCHES_RCVD=-0.001] autolearn=disabled
Received: from mx1-lw-eu.apache.org ([10.40.0.8])
by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new,
port 10024)
with ESMTP id rTUTJKloW2-q for ;
Fri, 1 Apr 2016 14:55:27 +0000 (UTC)
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP
id 9EC645FB2E
for ; Fri, 1 Apr 2016 14:55:26 +0000 (UTC)
Received: (qmail 82795 invoked by uid 99); 1 Apr 2016 14:55:25 -0000
Received: from arcas.apache.org (HELO arcas) (140.211.11.28)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 14:55:25 +0000
Received: from arcas.apache.org (localhost [127.0.0.1])
by arcas (Postfix) with ESMTP id 9132C2C033A
for ; Fri, 1 Apr 2016 14:55:25 +0000 (UTC)
Date: Fri, 1 Apr 2016 14:55:25 +0000 (UTC)
From: "ASF GitHub Bot (JIRA)"
To: dev@quarks.incubator.apache.org
Message-ID:
In-Reply-To:
References:
Subject: [jira] [Commented] (QUARKS-66) Job monitoring application which
restarts failed jobs
MIME-Version: 1.0
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: 7bit
X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
[ https://issues.apache.org/jira/browse/QUARKS-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15221794#comment-15221794 ]
ASF GitHub Bot commented on QUARKS-66:
--------------------------------------
Github user ddebrunner commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/59#discussion_r58217230
--- Diff: apps/runtime/src/main/java/quarks/apps/runtime/MonitorApp.java ---
@@ -0,0 +1,204 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.apps.runtime;
+
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonObject;
+
+import quarks.execution.DirectSubmitter;
+import quarks.execution.Job;
+import quarks.execution.services.ControlService;
+import quarks.execution.services.RuntimeServices;
+import quarks.execution.services.job.JobRegistryService;
+import quarks.function.Consumer;
+import quarks.function.Supplier;
+import quarks.runtime.jobregistry.JobEvents;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.TopologyProvider;
+import quarks.topology.mbeans.ApplicationServiceMXBean;
+import quarks.topology.plumbing.PlumbingStreams;
+import quarks.topology.services.ApplicationService;
+
+/**
+ * Job monitoring application.
+ *
+ * The application listens on JobRegistry events and resubmits jobs for which
+ * an event has been emitted because the job is unhealthy. The monitored
+ * applications must be registered with an {@code ApplicationService}
+ * prior to submission, otherwise the monitor application cannot restart
+ * them.
+ *
+ * The monitoring application must be submitted within a context which
+ * provides the following services:
+ *
+ * - ApplicationService - an {@code ApplicationServiceMXBean} control
+ * registered by this service is used to resubmit failed applications.
+ * - ControlService - the application queries this service for an
+ * {@code ApplicationServiceMXBean} control, which is then used for
+ * restarting failed applications.
+ * - JobRegistryService - generates job monitoring events.
+ *
+ *
+ */
+public class MonitorApp {
+ private final TopologyProvider provider;
+ private final DirectSubmitter submitter;
+ private final Topology topology;
+ private static final Logger logger = LoggerFactory.getLogger(MonitorApp.class);
+
+ /**
+ * Constructs a {@code MonitorApp} with the specified name in the
+ * context of the specified provider.
+ *
+ * @param provider the topology provider
+ * @param submitter a {@code DirectSubmitter} which provides required
+ * services and submits the application
+ * @param name the application name
+ *
+ * @throws IllegalArgumentException if the submitter does not provide
+ * access to the required services
+ */
+ public MonitorApp(TopologyProvider provider,
+ DirectSubmitter submitter, String name) {
+
+ this.provider = provider;
+ this.submitter = submitter;
+ validateSubmitter();
+ this.topology = declareTopology(name);
+ }
+
+ /**
+ * Submits the application topology.
+ *
+ * @return the job.
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ public Job submit() throws InterruptedException, ExecutionException {
+ Future f = submitter.submit(topology);
+ return f.get();
+ }
+
+ /**
+ * Submits an application using an {@code ApplicationServiceMXBean} control
+ * registered with the specified {@code ControlService}.
+ *
+ * @param applicationName the name of the application to submit
+ * @param controlService the control service
+ */
+ public static void submitApplication(String applicationName, ControlService controlService) {
+ try {
+ Set controls =
+ controlService.getControls(ApplicationServiceMXBean.class);
+ if (controls.isEmpty()) {
+ throw new IllegalStateException(
+ "Could not find a registered control with the following interface: " +
+ ApplicationServiceMXBean.class.getName());
+ }
+ for (ApplicationServiceMXBean control : controls)
+// TODO add ability to submit with the initial application configuration
+ control.submit(applicationName, null);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Declares the following topology:
+ *
+ * JobEvents source --> Filter (health == unhealthy) --> Restart application
+ *
+ *
+ * @param name the topology name
+ * @return the application topology
+ */
+ protected Topology declareTopology(String name) {
+ Topology t = provider.newTopology(name);
+ TStream jobEvents = JobEvents.source(
+ t,
+ (evType, job) -> { return MonitorAppEvent.toJsonObject(evType, job); }
+ );
+ jobEvents = PlumbingStreams.isolate(jobEvents, true);
--- End diff --
> Topology.source() and that defines the stream is isolated
Not sure what that means? How does it define it as isolated?
> Job monitoring application which restarts failed jobs
> -----------------------------------------------------
>
> Key: QUARKS-66
> URL: https://issues.apache.org/jira/browse/QUARKS-66
> Project: Quarks
> Issue Type: Task
> Reporter: Victor Dogaru
> Assignee: Victor Dogaru
> Labels: failure-recovery
>
> An application which filters job events indicating jobs which closed with an unhealthy state and resubmits applications associated with those jobs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)