gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/9] incubator-gobblin git commit: Azkaban Orchestrator for GaaS
Date Wed, 30 Aug 2017 15:55:27 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 90be15f47 -> 9b9fec817


Azkaban Orchestrator for GaaS


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/08e60efd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/08e60efd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/08e60efd

Branch: refs/heads/master
Commit: 08e60efd328485554344b179da2a54466d84628b
Parents: f96379e
Author: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Authored: Tue Aug 8 12:41:02 2017 -0700
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Tue Aug 8 12:41:02 2017 -0700

----------------------------------------------------------------------
 .../service-azkaban-hello-world.template        |  24 +
 .../orchestration/AzkabanAjaxAPIClient.java     | 435 +++++++++++++++++++
 .../modules/orchestration/AzkabanJobHelper.java | 272 ++++++++++++
 .../orchestration/AzkabanProjectConfig.java     | 123 ++++++
 .../AzkabanSpecExecutorInstance.java            | 106 +++++
 .../AzkabanSpecExecutorInstanceProducer.java    | 158 +++++++
 .../orchestration/ServiceAzkabanConfigKeys.java |  38 ++
 .../main/resources/default-service-azkaban.conf |  33 ++
 .../service-azkaban-hello-world.template        |   0
 9 files changed, 1189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-example/src/main/resources/service-azkaban-hello-world.template
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/service-azkaban-hello-world.template b/gobblin-example/src/main/resources/service-azkaban-hello-world.template
new file mode 100644
index 0000000..36a7418
--- /dev/null
+++ b/gobblin-example/src/main/resources/service-azkaban-hello-world.template
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gobblin.service.azkaban.username=<CHANGE ME>
+gobblin.service.azkaban.password=<CHANGE ME>
+gobblin.service.azkaban.server.url=<CHANGE ME>
+gobblin.service.azkaban.project.namePrefix=GobblinService_
+gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
+gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
+gobblin.service.azkaban.project.groupAdmins=""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
new file mode 100644
index 0000000..31fc753
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanAjaxAPIClient.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.codec.EncoderException;
+import org.apache.commons.codec.net.URLCodec;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.impl.client.BasicCookieStore;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
+
+import com.google.common.base.Splitter;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+
+@Slf4j
+public class AzkabanAjaxAPIClient {
+
+  private static Splitter SPLIT_ON_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+
+  // TODO: Ensure GET call urls do not grow too big
+  private static final int LOW_NETWORK_TRAFFIC_BEGIN_HOUR = 17;
+  private static final int LOW_NETWORK_TRAFFIC_END_HOUR = 22;
+  private static final int JOB_START_DELAY_MINUTES = 5;
+  private static final long MILLISECONDS_IN_HOUR = 60 * 60 * 1000;
+  private static final URLCodec codec = new URLCodec();
+
+  public static String authenticateAndGetSessionId(String username, String password, String azkabanServerUrl)
+      throws IOException, EncoderException {
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl);
+    StringEntity input = new StringEntity(String.format("action=%s&username=%s&password=%s", "login",
+        username, codec.encode(password)));
+    input.setContentType("application/x-www-form-urlencoded");
+    postRequest.setEntity(input);
+    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+
+    return handleResponse(response, "session.id").get("session.id");
+  }
+
+  public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    // Note: Every get call to Azkaban provides a projectId in response, so we have are using fetchProjectFlows call
+    // .. because it does not need any additional params other than project name
+    // Create get request
+    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=fetchprojectflows&session.id=%s&"
+            + "project=%s", azkabanProjectConfig.getAzkabanServerUrl(), sessionId,
+        azkabanProjectConfig.getAzkabanProjectName()));
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    return handleResponse(response, "projectId").get("projectId");
+  }
+
+  public static String createAzkabanProject(String sessionId, String zipFilePath,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+
+    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
+    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
+
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager?action=create");
+    StringEntity input = new StringEntity(String.format("session.id=%s&name=%s&description=%s", sessionId,
+        azkabanProjectName, azkabanProjectDescription));
+    input.setContentType("application/x-www-form-urlencoded");
+    postRequest.setEntity(input);
+    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+    handleResponse(response);
+
+    // Add proxy user if any
+    if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
+      Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
+      for (String user : proxyUsers) {
+        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+      }
+    }
+
+    // Add group permissions if any
+    // TODO: Support users (not just groups), and different permission types
+    // (though we can add users, we only support groups at the moment and award them with admin permissions)
+    if (StringUtils.isNotBlank(groupAdminUsers)) {
+      String [] groups = StringUtils.split(groupAdminUsers, ",");
+      for (String group : groups) {
+        addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false,
+            false, false);
+      }
+    }
+
+    // Upload zip file to azkaban and return projectId
+    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+  }
+
+  public static String replaceAzkabanProject(String sessionId, String zipFilePath,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+
+    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+    String azkabanProjectDescription = azkabanProjectConfig.getAzkabanProjectDescription();
+    String groupAdminUsers = azkabanProjectConfig.getAzkabanGroupAdminUsers();
+
+    // Change project description
+    changeProjectDescription(sessionId, azkabanServerUrl, azkabanProjectName, azkabanProjectDescription);
+
+    // Add proxy user if any
+    // Note: 1. We cannot remove previous proxy-user because there is no way to read it from Azkaban
+    //       2. Adding same proxy user multiple times is a non-issue
+    // Add proxy user if any
+    if (azkabanProjectConfig.getAzkabanUserToProxy().isPresent()) {
+      Iterable<String> proxyUsers = SPLIT_ON_COMMA.split(azkabanProjectConfig.getAzkabanUserToProxy().get());
+      for (String user : proxyUsers) {
+        addProxyUser(sessionId, azkabanServerUrl, azkabanProjectName, user);
+      }
+    }
+
+    // Add group permissions if any
+    // TODO: Support users (not just groups), and different permission types
+    // Note: 1. We cannot remove previous group-user because there is no way to read it from Azkaban
+    //       2. Adding same group-user will return an error message, but we will ignore it
+    // (though we can add users, we only support groups at the moment and award them with admin permissions)
+    if (StringUtils.isNotBlank(groupAdminUsers)) {
+      String [] groups = StringUtils.split(groupAdminUsers, ",");
+      for (String group : groups) {
+        try {
+          addUserPermission(sessionId, azkabanServerUrl, azkabanProjectName, group, true, true, false, false, false,
+              false);
+        } catch (IOException e) {
+          // Ignore if group already exists, we cannot list existing groups; so its okay to attempt adding exiting
+          // .. groups
+          if (!"Group permission already exists.".equalsIgnoreCase(e.getMessage())) {
+            throw e;
+          }
+        }
+      }
+    }
+
+    // Upload zip file to azkaban and return projectId
+    return uploadZipFileToAzkaban(sessionId, azkabanServerUrl, azkabanProjectName, zipFilePath);
+  }
+
+  private static void addProxyUser(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String proxyUser)
+      throws IOException {
+
+    // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
+    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addProxyUser&session.id=%s&"
+        + "project=%s&name=%s", azkabanServerUrl, sessionId, azkabanProjectName, proxyUser));
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    handleResponse(response);
+  }
+
+  private static void addUserPermission(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String name, boolean isGroup, boolean adminPermission, boolean readPermission, boolean writePermission,
+      boolean executePermission, boolean schedulePermission)
+      throws IOException {
+
+    // NOTE: We are not listing the permissions before adding them, because Azkaban in its current state only
+    // .. returns user permissions and not group permissions
+
+    // Create get request (adding same normal user permission multiple times will throw an error, but we cannot
+    // list whole list of permissions anyways)
+    HttpGet getRequest = new HttpGet(String.format("%s/manager?ajax=addPermission&session.id=%s&"
+            + "project=%s&name=%s&group=%s&permissions[admin]=%s&permissions[read]=%s&permissions[write]=%s"
+            + "&permissions[execute]=%s&permissions[schedule]=%s", azkabanServerUrl, sessionId, azkabanProjectName, name,
+        isGroup, adminPermission, readPermission, writePermission, executePermission, schedulePermission));
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    handleResponse(response);
+  }
+
+  private static String uploadZipFileToAzkaban(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String jobZipFile)
+      throws IOException {
+
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/manager");
+    HttpEntity entity = MultipartEntityBuilder
+        .create()
+        .addTextBody("session.id", sessionId)
+        .addTextBody("ajax", "upload")
+        .addBinaryBody("file", new File(jobZipFile),
+            ContentType.create("application/zip"), azkabanProjectName + ".zip")
+        .addTextBody("project", azkabanProjectName)
+        .build();
+    postRequest.setEntity(entity);
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+
+    // Obtaining projectId is hard. Uploading zip file is one avenue to get it from Azkaban
+    return handleResponse(response, "projectId").get("projectId");
+  }
+
+  public static void scheduleAzkabanProject(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    String azkabanServerUrl = azkabanProjectConfig.getAzkabanServerUrl();
+    String azkabanProjectName = azkabanProjectConfig.getAzkabanProjectName();
+    String azkabanProjectFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+
+    String scheduleString = "is_recurring=off"; // run only once
+    // TODO: Enable scheduling on Azkaban, when we are ready to push down the schedule
+//    if (azkabanProjectConfig.isScheduled()) {
+//      scheduleString = "is_recurring=on&period=1d"; // schedule once every day
+//    }
+
+    // Create post request
+    HttpPost postRequest = new HttpPost(azkabanServerUrl + "/schedule");
+    StringEntity input = new StringEntity(String.format("session.id=%s&ajax=scheduleFlow"
+            + "&projectName=%s&flow=%s&projectId=%s&scheduleTime=%s&scheduleDate=%s&%s",
+        sessionId, azkabanProjectName, azkabanProjectFlowName, azkabanProjectId,
+        getScheduledTimeInAzkabanFormat(LOW_NETWORK_TRAFFIC_BEGIN_HOUR, LOW_NETWORK_TRAFFIC_END_HOUR,
+            JOB_START_DELAY_MINUTES), getScheduledDateInAzkabanFormat(), scheduleString));
+    input.setContentType("application/x-www-form-urlencoded");
+    postRequest.setEntity(input);
+    postRequest.setHeader("X-Requested-With", "XMLHttpRequest");
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(postRequest);
+    handleResponse(response);
+  }
+
+  private static void changeProjectDescription(String sessionId, String azkabanServerUrl, String azkabanProjectName,
+      String projectDescription)
+      throws IOException {
+
+    HttpGet getRequest;
+    try {
+      // Create get request (adding same proxy user multiple times is a non-issue, Azkaban handles it)
+      getRequest = new HttpGet(String
+          .format("%s/manager?ajax=changeDescription&session.id=%s&" + "project=%s&description=%s", azkabanServerUrl,
+              sessionId, azkabanProjectName, new URLCodec().encode(projectDescription)));
+    } catch (EncoderException e) {
+      throw new IOException("Could not encode Azkaban project description", e);
+    }
+
+    // Make the call, get response
+    @Cleanup CloseableHttpClient httpClient = getHttpClient();
+    HttpResponse response = httpClient.execute(getRequest);
+    handleResponse(response);
+  }
+
+  public static void notifyUberdistcp2ToolServer(String uberdistcp2ToolServer,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    boolean isGoUrl = false;
+    if (!StringUtils.isBlank(uberdistcp2ToolServer)) {
+      if (uberdistcp2ToolServer.startsWith("https://go") || uberdistcp2ToolServer.startsWith("http://go")) {
+        isGoUrl = true;
+      }
+    }
+  }
+
+  private static CloseableHttpClient getHttpClient()
+      throws IOException {
+    try {
+      // Self sign SSL
+      SSLContextBuilder builder = new SSLContextBuilder();
+      builder.loadTrustMaterial(null, (TrustStrategy) new TrustSelfSignedStrategy());
+      SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build());
+
+      // Create client
+      return HttpClients.custom().setSSLSocketFactory(sslsf).setDefaultCookieStore(new BasicCookieStore()).build();
+    } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
+      throw new IOException("Issue with creating http client", e);
+    }
+  }
+
+  private static Map<String, String> handleResponse(HttpResponse response, String... responseKeys)
+      throws IOException {
+    if (response.getStatusLine().getStatusCode() != 201 && response.getStatusLine().getStatusCode()!= 200) {
+      log.error("Failed : HTTP error code : " + response.getStatusLine().getStatusCode());
+      throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode());
+    }
+
+    // Get response in string
+    InputStream in = response.getEntity().getContent();
+    String jsonResponseString = IOUtils.toString(in, "UTF-8");
+    log.info("Response string: " + jsonResponseString);
+
+    // Parse Json
+    Map<String, String> responseMap = new HashMap<>();
+    if (StringUtils.isNotBlank(jsonResponseString)) {
+      JsonObject jsonObject = new JsonParser().parse(jsonResponseString).getAsJsonObject();
+
+      // Handle error if any
+      handleResponseError(jsonObject);
+
+      // Get required responseKeys
+      if (ArrayUtils.isNotEmpty(responseKeys)) {
+        for (String responseKey : responseKeys) {
+          responseMap.put(responseKey, jsonObject.get(responseKey).toString().replaceAll("\"", ""));
+        }
+      }
+    }
+
+    return responseMap;
+  }
+
+  private static void handleResponseError(JsonObject jsonObject) throws IOException {
+    // Azkaban does not has a standard for error messages tag
+    if (null != jsonObject.get("status") && "error".equalsIgnoreCase(jsonObject.get("status").toString()
+        .replaceAll("\"", ""))) {
+      String message = (null != jsonObject.get("message")) ?
+          jsonObject.get("message").toString().replaceAll("\"", "") : "Issue in creating project";
+      throw new IOException(message);
+    }
+
+    if (null != jsonObject.get("error")) {
+      String error = jsonObject.get("error").toString().replaceAll("\"", "");
+      throw new IOException(error);
+    }
+  }
+
+  /***
+   * Generate a random scheduled time between specified execution time window in the Azkaban compatible format
+   * which is: hh,mm,a,z Eg. ScheduleTime=12,00,PM,PDT
+   *
+   * @param windowStartHour Window start hour in 24 hr (HH) format (inclusive)
+   * @param windowEndHour Window end hour in 24 hr (HH) format (exclusive)
+   * @param delayMinutes If current time is within window, then additional delay for bootstrapping if desired
+   * @return Scheduled time string of the format hh,mm,a,z
+   */
+  public static String getScheduledTimeInAzkabanFormat(int windowStartHour, int windowEndHour, int delayMinutes) {
+    // Validate
+    if (windowStartHour < 0 || windowEndHour > 23 || windowStartHour >= windowEndHour) {
+      throw new IllegalArgumentException("Window start should be less than window end, and both should be between "
+          + "0 and 23");
+    }
+    if (delayMinutes < 0 || delayMinutes > 59) {
+      throw new IllegalArgumentException("Delay in minutes should be between 0 and 59 (inclusive)");
+    }
+
+    // Setup window
+    Calendar windowStartTime = Calendar.getInstance();
+    windowStartTime.set(Calendar.HOUR_OF_DAY, windowStartHour);
+    windowStartTime.set(Calendar.MINUTE, 0);
+    windowStartTime.set(Calendar.SECOND, 0);
+
+    Calendar windowEndTime = Calendar.getInstance();
+    windowEndTime.set(Calendar.HOUR_OF_DAY, windowEndHour);
+    windowEndTime.set(Calendar.MINUTE, 0);
+    windowEndTime.set(Calendar.SECOND, 0);
+
+    // Check if current time is between windowStartTime and windowEndTime, then let the execution happen
+    // after delayMinutes minutes
+    Calendar now = Calendar.getInstance();
+    if (now.after(windowStartTime) && now.before(windowEndTime)) {
+      // Azkaban takes a few seconds / a minute to bootstrap,
+      // so extra few minutes get the first execution to run instantly
+      now.add(Calendar.MINUTE, delayMinutes);
+
+      return new SimpleDateFormat("hh,mm,a,z").format(now.getTime());
+    }
+
+    // Current time is not between windowStartTime and windowEndTime, so get random execution time for next day
+    int allowedSchedulingWindow = (int)((windowEndTime.getTimeInMillis() - windowStartTime.getTimeInMillis()) /
+        MILLISECONDS_IN_HOUR);
+    int randomHourInWindow = new Random(System.currentTimeMillis()).nextInt(allowedSchedulingWindow);
+    int randomMinute = new Random(System.currentTimeMillis()).nextInt(60);
+    windowStartTime.add(Calendar.HOUR, randomHourInWindow);
+    windowStartTime.set(Calendar.MINUTE, randomMinute);
+
+    return new SimpleDateFormat("hh,mm,a,z").format(windowStartTime.getTime());
+  }
+
+  private static String getScheduledDateInAzkabanFormat() {
+    // Eg. ScheduleDate=07/22/2014"
+    return new SimpleDateFormat("MM/dd/yyyy").format(new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
new file mode 100644
index 0000000..627761e
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanJobHelper.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Lists;
+
+
+@Slf4j
+public class AzkabanJobHelper {
+
+  public static boolean isAzkabanJobPresent(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Checking if Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName() + " exists");
+    try {
+      // NOTE: hacky way to determine if project already exists because Azkaban does not provides a way to
+      // .. check if the project already exists or not
+      boolean isPresent = StringUtils.isNotBlank(AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig));
+      log.info("Project exists: " + isPresent);
+
+      return isPresent;
+    } catch (IOException e) {
+      // Project doesn't exists
+      if (String.format("Project %s doesn't exist.", azkabanProjectConfig.getAzkabanProjectName())
+          .equalsIgnoreCase(e.getMessage())) {
+        log.info("Project does not exists.");
+        return false;
+      }
+      // Project exists but with no read access to current user
+      if ("Permission denied. Need READ access.".equalsIgnoreCase(e.getMessage())) {
+        log.info("Project exists, but current user does not has READ access.");
+        return true;
+      }
+      // Some other error
+      log.error("Issue in checking if project is present", e);
+      throw e;
+    }
+  }
+
+  public static String getProjectId(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Getting project Id for project: " + azkabanProjectConfig.getAzkabanProjectName());
+    String projectId = AzkabanAjaxAPIClient.getProjectId(sessionId, azkabanProjectConfig);
+    log.info("Project id: " + projectId);
+
+    return projectId;
+  }
+
+  public static String createAzkabanJob(String sessionId, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Creating Azkaban project for: " + azkabanProjectConfig.getAzkabanProjectName());
+
+    // Create zip file
+    String zipFilePath = createAzkabanJobZip(azkabanProjectConfig);
+    log.info("Zip file path: " + zipFilePath);
+
+    // Upload zip file to Azkaban
+    String projectId = AzkabanAjaxAPIClient.createAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig);
+    log.info("Project Id: " + projectId);
+
+    return projectId;
+  }
+
+  public static String replaceAzkabanJob(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    log.info("Replacing zip for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+
+    // Create zip file
+    String zipFilePath = createAzkabanJobZip(azkabanProjectConfig);
+    log.info("Zip file path: " + zipFilePath);
+
+    // Replace the zip file on Azkaban
+    String projectId = AzkabanAjaxAPIClient.replaceAzkabanProject(sessionId, zipFilePath, azkabanProjectConfig);
+    log.info("Project Id: " + projectId);
+
+    return projectId;
+  }
+
+  public static void scheduleJob(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Scheduling Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+    AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+
+  public static void changeJobSchedule(String sessionId, String azkabanProjectId,
+      AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Changing schedule for Azkaban project: " + azkabanProjectConfig.getAzkabanProjectName());
+    AzkabanAjaxAPIClient.scheduleAzkabanProject(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+
+  public static String createAzkabanJobZip(AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    log.info("Creating Azkaban job zip file for project: " + azkabanProjectConfig.getAzkabanProjectName());
+    String workDir = azkabanProjectConfig.getWorkDir();
+
+    Optional<String> jarUrlTemplate = azkabanProjectConfig.getAzkabanZipJarUrlTemplate();
+    Optional<List<String>> jarNames = azkabanProjectConfig.getAzkabanZipJarNames();
+    Optional<String> jarVersion = azkabanProjectConfig.getAzkabanZipJarVersion();
+    Optional<List<String>> additionalFiles = azkabanProjectConfig.getAzkabanZipAdditionalFiles();
+    boolean failIfJarNotFound = azkabanProjectConfig.getFailIfJarNotFound();
+    String jobFlowName = azkabanProjectConfig.getAzkabanProjectFlowName();
+    String zipFilename = azkabanProjectConfig.getAzkabanProjectZipFilename();
+
+    // Download the job jars
+    List<File> filesToAdd = Lists.newArrayList();
+    if (jarNames.isPresent() && jarUrlTemplate.isPresent() && jarVersion.isPresent()) {
+      String urlTemplate = jarUrlTemplate.get();
+      String version = jarVersion.get();
+      for (String jarName : jarNames.get()) {
+        String jobJarUrl = urlTemplate.replaceAll("<module-version>", version).replaceAll("<module-name>", jarName);
+        log.info("Downloading job jar from: " + jobJarUrl + " to: " + workDir);
+        File jobJarFile = null;
+        try {
+          jobJarFile = downloadAzkabanJobJar(workDir, jobJarUrl);
+          filesToAdd.add(jobJarFile);
+        } catch (IOException e) {
+          if(failIfJarNotFound) {
+            throw e;
+          }
+          log.warn("Could not download: " + jobJarFile);
+        }
+      }
+    }
+
+    // Download additional files
+    if (additionalFiles.isPresent()) {
+      List<String> files = additionalFiles.get();
+      for (String fileName : files) {
+        log.info("Downloading additional file from: " + fileName + " to: " + workDir);
+        File additionalFile = null;
+        try {
+          additionalFile = downloadAzkabanJobJar(workDir, fileName);
+          filesToAdd.add(additionalFile);
+        } catch (IOException e) {
+          if(failIfJarNotFound) {
+            throw e;
+          }
+          log.warn("Could not download: " + additionalFile);
+        }
+      }
+    }
+
+    // Write the config files
+    log.info("Writing Azkaban config files");
+    File [] jobConfigFile = writeAzkabanConfigFiles(workDir, jobFlowName, azkabanProjectConfig);
+    filesToAdd.add(jobConfigFile[0]);
+
+    // Create the zip file
+    log.info("Writing zip file");
+    String zipfile = createZipFile(workDir, zipFilename, filesToAdd);
+    log.info("Wrote zip file: " + zipfile);
+
+    return zipfile;
+  }
+
+  private static String createZipFile(String directory, String zipFilename, List<File> filesToAdd)
+      throws IOException {
+    // Determine final zip file path
+    String zipFilePath = String.format("%s/%s", directory, zipFilename);
+    File zipFile = new File(zipFilePath);
+    zipFile.delete();
+
+    // Create and add files to zip file
+    addFilesToZip(zipFile, filesToAdd);
+
+    return zipFilePath;
+  }
+
+  private static void addFilesToZip(File zipFile, List<File> filesToAdd) throws IOException {
+    try {
+      @Cleanup
+      OutputStream archiveStream = new FileOutputStream(zipFile);
+      @Cleanup
+      ArchiveOutputStream archive =
+          new ArchiveStreamFactory().createArchiveOutputStream(ArchiveStreamFactory.ZIP, archiveStream);
+
+      for (File fileToAdd : filesToAdd) {
+        ZipArchiveEntry entry = new ZipArchiveEntry(fileToAdd.getName());
+        archive.putArchiveEntry(entry);
+
+        @Cleanup
+        BufferedInputStream input = new BufferedInputStream(new FileInputStream(fileToAdd));
+        IOUtils.copy(input, archive);
+        archive.closeArchiveEntry();
+      }
+
+      archive.finish();
+    } catch (ArchiveException e) {
+      throw new IOException("Issue with creating archive", e);
+    }
+  }
+
+  private static File[] writeAzkabanConfigFiles(String workDir, String flowName, AzkabanProjectConfig azkabanProjectConfig)
+      throws IOException {
+    // Determine final config file path
+    String jobFilePath = String.format("%s/%s.job", workDir, flowName);
+    File jobFile = new File(jobFilePath);
+    jobFile.delete();
+
+    StringBuilder propertyFileContent = new StringBuilder();
+    for (Map.Entry entry : azkabanProjectConfig.getJobSpec().getConfigAsProperties().entrySet()) {
+      propertyFileContent.append(String.format("%s=%s", entry.getKey(), entry.getValue())).append("\n");
+    }
+
+    // Write the job file
+    FileUtils.writeStringToFile(jobFile, propertyFileContent.toString(), Charset.forName("UTF-8"),true);
+
+    return new File[] {jobFile};
+  }
+
+  private static File downloadAzkabanJobJar(String workDir, String jobJarUrl)
+      throws IOException {
+    // Determine final jar file path
+    String[] jobJarUrlParts = jobJarUrl.trim().split("/");
+    String jobJarName = jobJarUrlParts[jobJarUrlParts.length-1];
+    String jobJarFilePath = String.format("%s/%s", workDir, jobJarName);
+    File jobJarFile = new File(jobJarFilePath);
+    jobJarFile.delete();
+
+    // Create work directory if not already exists
+    FileUtils.forceMkdir(new File(workDir));
+
+    // Download jar file from artifactory
+    @Cleanup InputStream jobJarInputStream = new URL(jobJarUrl).openStream();
+    @Cleanup OutputStream jobJarOutputStream = new FileOutputStream(jobJarFile);
+    IOUtils.copy(jobJarInputStream, jobJarOutputStream);
+
+    // TODO: compare checksum
+
+    return jobJarFile;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
new file mode 100644
index 0000000..ddae3d9
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.List;
+import java.util.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.util.ConfigUtils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+@Getter
+@ToString
+@AllArgsConstructor
+@Builder(builderMethodName = "hiddenBuilder")
+/***
+ * Class to hold Azkaban project specific configs
+ */
+public class AzkabanProjectConfig {
+  private static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf";
+
+  private final String azkabanServerUrl;
+
+  private final String azkabanProjectName;
+  private final String azkabanProjectDescription;
+  private final String azkabanProjectFlowName;
+  private final String azkabanGroupAdminUsers;
+  private final Optional<String> azkabanUserToProxy;
+
+  private final Optional<List<String>> azkabanZipJarNames;
+  private final Optional<String> azkabanZipJarUrlTemplate;
+  private final Optional<String> azkabanZipJarVersion;
+  private final Optional<List<String>> azkabanZipAdditionalFiles;
+  private final Boolean failIfJarNotFound;
+
+  private final JobSpec jobSpec;
+
+  public AzkabanProjectConfig(JobSpec jobSpec) {
+    // Extract config objects
+    this.jobSpec = jobSpec;
+    Config defaultConfig = ConfigFactory.load(DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
+    Config config  = jobSpec.getConfig().withFallback(defaultConfig);
+
+    // Azkaban Infrastructure
+    this.azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+
+    // Azkaban Project Metadata
+    this.azkabanProjectName = constructProjectName(jobSpec, config);
+    this.azkabanProjectDescription = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_DESCRIPTION_KEY);
+    this.azkabanProjectFlowName = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_FLOW_NAME_KEY);
+    this.azkabanGroupAdminUsers = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_GROUP_ADMINS_KEY);
+    this.azkabanUserToProxy = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, null));
+
+    // Azkaban Project Zip
+    this.azkabanZipJarNames = Optional.ofNullable(ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY));
+    this.azkabanZipJarUrlTemplate = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY, null));
+    this.azkabanZipJarVersion = Optional.ofNullable(ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY, null));
+    this.azkabanZipAdditionalFiles = Optional.ofNullable(
+        ConfigUtils.getStringList(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY));
+    this.failIfJarNotFound = ConfigUtils.getBoolean(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY, false);
+  }
+
+  private String constructProjectName(JobSpec jobSpec, Config config) {
+    String projectNamePrefix = ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PROJECT_NAME_PREFIX_KEY, "");
+    String projectNamePostfix = null == jobSpec.getUri() ? "" :
+        jobSpec.getUri().toString().replaceAll("_", "-").replaceAll("[^A-Za-z0-9\\-]", "_");
+
+    return trimProjectName(String.format("%s_%s", projectNamePrefix, projectNamePostfix));
+  }
+
+  /***
+   * Get Azkaban project zip file name
+   * @return Azkaban project zip file name
+   */
+  public String getAzkabanProjectZipFilename() {
+    return String.format("%s.zip", azkabanProjectName);
+  }
+
+  /***
+   * Get Azkaban project working directory, generated by prefixing a temp name
+   * @return Azkaban project working directory
+   */
+  public String getWorkDir() {
+    return String.format("%s/%s/%s/%s", System.getProperty("user.dir"), "serviceAzkaban", azkabanProjectName, System.currentTimeMillis());
+  }
+
+  private static String trimProjectName(String projectName) {
+    // Azkaban does not support name greater than 64 chars, so limit it to 64 chars
+    if (projectName.length() > 64) {
+      // We are using string.hashcode() so that for same path the generated project name is same (and hence checking
+      // .. for path duplicates is deterministic. Using UUID or currentMillis will produce different shortened path
+      // .. for the same path every time)
+      int pathHash = projectName.hashCode();
+      if (pathHash < 0) {
+        pathHash *= -1;
+      }
+      projectName = String.format("%s_%s", projectName.substring(0, 53), pathHash);
+    }
+
+    return projectName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
new file mode 100644
index 0000000..65209c3
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SpecExecutorInstance;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+
+public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance {
+  protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
+  protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults();
+
+  // Executor Instance
+  protected final Config _config;
+  protected final Logger _log;
+  protected final URI _specExecutorInstanceUri;
+  protected final Map<String, String> _capabilities;
+
+  public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) {
+    _config = config;
+    _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
+    try {
+      _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
+          "NA"));
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    _capabilities = Maps.newHashMap();
+    if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) {
+      String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY);
+      List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr);
+      for (String capability : capabilities) {
+        List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability);
+        Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported "
+            + "per capability, found: " + currentCapability);
+        _capabilities.put(currentCapability.get(0), currentCapability.get(1));
+      }
+    }
+  }
+
+  @Override
+  public URI getUri() {
+    return _specExecutorInstanceUri;
+  }
+
+  @Override
+  public Future<String> getDescription() {
+    return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null);
+  }
+
+  @Override
+  public Future<Config> getConfig() {
+    return new CompletedFuture<>(_config, null);
+  }
+
+  @Override
+  public Future<String> getHealth() {
+    return new CompletedFuture<>("Healthy", null);
+  }
+
+  @Override
+  public Future<? extends Map<String, String>> getCapabilities() {
+    return new CompletedFuture<>(_capabilities, null);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    // nothing to do in default implementation
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // nothing to do in default implementation
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
new file mode 100644
index 0000000..f093af2
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.apache.commons.codec.EncoderException;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer;
+import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+
+public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInstance
+    implements SpecExecutorInstanceProducer<Spec>, Closeable {
+
+  // Session Id for GaaS User
+  private String sessionId;
+
+
+  public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) {
+    super(config, log);
+
+    try {
+      // Initialize Azkaban client / producer and cache credentials
+      String azkabanUsername = config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY);
+      String azkabanPassword = config.getString(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY);
+      String azkabanServerUrl = config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY);
+
+      sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl);
+    } catch (IOException | EncoderException e) {
+      throw new RuntimeException("Could not authenticate with Azkaban", e);
+    }
+  }
+
+  public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) {
+    this(config, Optional.of(log));
+  }
+
+  /** Constructor with no logging */
+  public AzkabanSpecExecutorInstanceProducer(Config config) {
+    this(config, Optional.<Logger>absent());
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public Future<?> addSpec(Spec addedSpec) {
+    // If project already exists, execute it
+
+    // If project does not already exists, create and execute it
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec);
+    try {
+      _log.info("Setting up your Azkaban Project for: " + azkabanProjectConfig.getAzkabanProjectName());
+
+      // Deleted project also returns true if-project-exists check, so optimistically first create the project
+      // .. (it will create project if it was never created or deleted), if project exists it will fail with
+      // .. appropriate exception message, catch that and run in replace project mode if force overwrite is
+      // .. specified
+      try {
+        createNewAzkabanProject(sessionId, azkabanProjectConfig);
+      } catch (IOException e) {
+        if ("Project already exists.".equalsIgnoreCase(e.getMessage())) {
+          if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(),
+              ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) {
+            _log.info("Project already exists for this Spec, but force overwrite specified");
+            updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+          } else {
+            _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s",
+                azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+          }
+        } else {
+          throw e;
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Issue in setting up Azkaban project.", e);
+    }
+
+    return null;
+  }
+
+  @Override
+  public Future<?> updateSpec(Spec updatedSpec) {
+    // Re-create project
+    AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec);
+
+    try {
+      updateExistingAzkabanProject(sessionId, azkabanProjectConfig);
+    } catch (IOException e) {
+      throw new RuntimeException("Issue in setting up Azkaban project.", e);
+    }
+
+    return new CompletedFuture<>(_config, null);
+  }
+
+  @Override
+  public Future<?> deleteSpec(URI deletedSpecURI) {
+    // Delete project
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Future<? extends List<Spec>> listSpecs() {
+    throw new UnsupportedOperationException();
+  }
+
+  private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    // Create Azkaban Job
+    String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig);
+
+    // Schedule Azkaban Job
+    AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+    _log.info(String.format("Azkaban project created: %smanager?project=%s",
+        azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName()));
+  }
+
+  private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException {
+    _log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(),
+        azkabanProjectConfig.getAzkabanProjectName()));
+
+    // Get project Id
+    String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig);
+
+    // Replace Azkaban Job
+    AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig);
+
+    // Change schedule
+    AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
new file mode 100644
index 0000000..762561c
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+public class ServiceAzkabanConfigKeys {
+  public static final String GOBBLIN_SERVICE_AZKABAN_PREFIX = "gobblin.service.azkaban.";
+
+  // Azkaban Session Specifics
+  public static final String AZKABAN_USERNAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "username";
+  public static final String AZKABAN_PASSWORD_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "password";
+  public static final String AZKABAN_SERVER_URL_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "server.url";
+  public static final String AZKABAN_PROJECT_NAME_PREFIX_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.namePrefix";
+  public static final String AZKABAN_PROJECT_DESCRIPTION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.description";
+  public static final String AZKABAN_PROJECT_USER_TO_PROXY_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.userToProxy";
+  public static final String AZKABAN_PROJECT_FLOW_NAME_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.flowName";
+  public static final String AZKABAN_PROJECT_GROUP_ADMINS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.groupAdmins";
+  public static final String AZKABAN_PROJECT_ZIP_JAR_URL_TEMPLATE_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarUrlTemplate";
+  public static final String AZKABAN_PROJECT_ZIP_JAR_NAMES_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarNames";
+  public static final String AZKABAN_PROJECT_ZIP_JAR_VERSION_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.jarVersion";
+  public static final String AZKABAN_PROJECT_ZIP_FAIL_IF_JARNOTFOUND_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.failIfJarNotFound";
+  public static final String AZKABAN_PROJECT_ZIP_ADDITIONAL_FILE_URLS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.zip.additionalFilesUrl";
+  public static final String AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY = GOBBLIN_SERVICE_AZKABAN_PREFIX + "project.overwriteIfExists";
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
new file mode 100644
index 0000000..01a26f1
--- /dev/null
+++ b/gobblin-modules/gobblin-azkaban/src/main/resources/default-service-azkaban.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default values
+# These values should generally come from template being used by the Service
+gobblin.service.azkaban.project.namePrefix=GobblinService_
+gobblin.service.azkaban.project.description="Gobblin Service has setup this project"
+gobblin.service.azkaban.project.flowName="GobblinServiceFlow"
+
+gobblin.service.azkaban.project.zip.jarUrlTemplate=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplate}
+gobblin.service.azkaban.project.zip.jarNames=${gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules}
+gobblin.service.azkaban.project.zip.jarVersion=${gobblin.service.azkaban.project.job.jar.mavenGobblinVersion}
+
+gobblin.service.azkaban.project.zip.failIfJarNotFound=false
+gobblin.service.azkaban.project.zip.additionalFilesUrl=""
+
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplate="https://repo.maven.apache.org/maven2/com/linkedin/gobblin/<module-name>/<module-version>/<module-name>-<module-version>.jar"
+gobblin.service.azkaban.project.job.jar.mavenUrlTemplateModules="gobblin-admin,gobblin-api,gobblin-compaction,gobblin-config-management,gobblin-core,gobblin-core-base,gobblin-distribution,gobblin-example,gobblin-hive-registration,gobblin-metrics-libs,gobblin-metastore,gobblin-modules,gobblin-rest-service,gobblin-runtime,gobblin-runtime-hadoop,gobblin-utility,gobblin-salesforce,gobblin-test-harness,gobblin-tunnel,gobblin-data-management,gobblin-config-management,gobblin-audit,gobblin-yarn,gobblin-cluster,gobblin-aws,gobblin-service,gobblin-test-utils"
+gobblin.service.azkaban.project.job.jar.mavenGobblinVersion="0.11.0"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/08e60efd/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template b/gobblin-runtime/src/main/resources/templates/service-azkaban-hello-world.template
new file mode 100644
index 0000000..e69de29


Mime
View raw message