Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A5B81200D50 for ; Fri, 27 Oct 2017 01:38:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A435D160BF3; Thu, 26 Oct 2017 23:38:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9D8C51609E5 for ; Fri, 27 Oct 2017 01:38:04 +0200 (CEST) Received: (qmail 75078 invoked by uid 500); 26 Oct 2017 23:38:03 -0000 Mailing-List: contact commits-help@gobblin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gobblin.incubator.apache.org Delivered-To: mailing list commits@gobblin.incubator.apache.org Received: (qmail 75060 invoked by uid 99); 26 Oct 2017 23:38:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Oct 2017 23:38:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5B13DF9E6; Thu, 26 Oct 2017 23:38:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: abti@apache.org To: commits@gobblin.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-gobblin git commit: [GOBBLIN-299] Add deletion support to Azkaban Orchestrator Date: Thu, 26 Oct 2017 23:38:03 +0000 (UTC) archived-at: Thu, 26 Oct 2017 23:38:05 -0000 Repository: incubator-gobblin Updated Branches: refs/heads/master 2d05b03d5 -> 278b48d41 [GOBBLIN-299] Add deletion support to Azkaban Orchestrator Closes #2154 from abti/service_azkaban_orchestrator Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/278b48d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/278b48d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/278b48d4 Branch: refs/heads/master Commit: 278b48d41464c1959b03c479025dac1543b9a0a7 Parents: 2d05b03 Author: Abhishek Tiwari Authored: Fri Oct 27 05:07:54 2017 +0530 Committer: Abhishek Tiwari Committed: Fri Oct 27 05:07:54 2017 +0530 ---------------------------------------------------------------------- .../orchestration/AzkabanAjaxAPIClient.java | 16 +++++++++ .../modules/orchestration/AzkabanJobHelper.java | 14 ++++++++ .../orchestration/AzkabanProjectConfig.java | 2 +- .../orchestration/AzkabanSpecProducer.java | 8 +++++ .../modules/orchestration/Orchestrator.java | 34 +++++++++++++++++++- .../scheduler/GobblinServiceJobScheduler.java | 11 +++++-- 6 files changed, 81 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java index 9c3cee4..f54d6a5 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java @@ -157,6 +157,22 @@ public class AzkabanAjaxAPIClient { } /*** + * Deletes an Azkaban project. + * @param sessionId Session Id. + * @param azkabanProjectConfig Azkaban Project Config. + * @throws IOException + */ + public static void deleteAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + Map params = Maps.newHashMap(); + params.put("delete", "true"); + params.put("project", azkabanProjectConfig.getAzkabanProjectName()); + + executeGetRequest(prepareGetRequest(azkabanProjectConfig.getAzkabanServerUrl() + "/manager", + sessionId, params)); + } + + /*** * Replace an existing Azkaban Project. If proxy user and group permissions are specified in * Azkaban Project Config, then this method also adds it to the project configuration. * @param sessionId Session Id. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java index 4fbe32b..09b6dde 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java @@ -121,6 +121,20 @@ public class AzkabanJobHelper { } /*** + * Delete project on Azkaban based on Azkaban config. + * @param sessionId Session Id. + * @param azkabanProjectConfig Azkaban Project Config. + * @throws IOException + */ + public static void deleteAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig) + throws IOException { + log.info("Deleting Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName()); + + // Delete project + AzkabanAjaxAPIClient.deleteAzkabanProject(sessionId, azkabanProjectConfig); + } + + /*** * Replace project on Azkaban based on Azkaban config. This includes preparing the zip file and uploading it to * Azkaban, setting permissions and schedule. * @param sessionId Session Id. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java index 583988b..4b8bb9c 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java @@ -86,7 +86,7 @@ public class AzkabanProjectConfig { this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false); } - private String constructProjectName(JobSpec jobSpec, Config config) { + public static String constructProjectName(JobSpec jobSpec, Config config) { String projectNamePrefix = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_NAME_PREFIX_KEY, ""); String projectNamePostfix = null == jobSpec.getUri() ? "" : jobSpec.getUri().toString().replaceAll("_", "-").replaceAll("[^A-Za-z0-9\\-]", "_"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java index 5a491ab..7b11cef 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java @@ -141,6 +141,14 @@ public class AzkabanSpecProducer implements SpecProducer, Closeable { @Override public Future deleteSpec(URI deletedSpecURI) { // Delete project + JobSpec jobSpec = new JobSpec.Builder(deletedSpecURI).build(); + + try { + AzkabanJobHelper.deleteAzkabanJob(_sessionId, new AzkabanProjectConfig(jobSpec)); + } catch (IOException e) { + throw new RuntimeException("Issue in deleting Azkaban project.", e); + } + throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 286911b..e2d36aa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -196,7 +196,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { producer = specsToExecute.getValue().getProducer().get(); Spec jobSpec = specsToExecute.getKey(); - _log.info(String.format("Going to orchestrate JobSpc: %s on Executor: %s", jobSpec, producer)); + _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); producer.addSpec(jobSpec); } catch(Exception e) { _log.error("Cannot successfully setup spec: " + specsToExecute.getKey() + " on executor: " + producer + @@ -211,6 +211,38 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } + public void remove(Spec spec) { + // TODO: Evolve logic to cache and reuse previously compiled JobSpecs + // .. this will work for Identity compiler but not always for multi-hop. + // Note: Current logic assumes compilation is consistent between all executions + if (spec instanceof FlowSpec) { + Map specExecutorInstanceMap = specCompiler.compileFlow(spec); + + if (specExecutorInstanceMap.isEmpty()) { + _log.warn("Cannot determine an executor to delete Spec: " + spec); + return; + } + + // Delete all compiled JobSpecs on their respective Executor + for (Map.Entry specsToDelete : specExecutorInstanceMap.entrySet()) { + // Delete this spec on selected executor + SpecProducer producer = null; + try { + producer = specsToDelete.getValue().getProducer().get(); + Spec jobSpec = specsToDelete.getKey(); + + _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer)); + producer.deleteSpec(jobSpec.getUri()); + } catch(Exception e) { + _log.error("Cannot successfully delete spec: " + specsToDelete.getKey() + " on executor: " + producer + + " for flow: " + spec, e); + } + } + } else { + throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec); + } + } + @Nonnull @Override public MetricContext getMetricContext() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/278b48d4/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 0eabf2c..0c45daf 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -229,8 +229,15 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata } try { - this.scheduledFlowSpecs.remove(deletedSpecURI.toString()); - unscheduleJob(deletedSpecURI.toString()); + Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString()); + if (null != deletedSpec) { + this.orchestrator.remove(deletedSpec); + this.scheduledFlowSpecs.remove(deletedSpecURI.toString()); + unscheduleJob(deletedSpecURI.toString()); + } else { + _log.warn(String.format("Spec with URI: %s was not found in cache. May be it was cleaned, if not please " + + "clean it manually", deletedSpecURI)); + } } catch (JobException e) { _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", deletedSpecURI), e);