Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 92525200B6D for ; Tue, 9 Aug 2016 07:25:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 90F64160A91; Tue, 9 Aug 2016 05:25:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 33B3B160AB3 for ; Tue, 9 Aug 2016 07:25:48 +0200 (CEST) Received: (qmail 56279 invoked by uid 500); 9 Aug 2016 05:25:47 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 56270 invoked by uid 99); 9 Aug 2016 05:25:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 05:25:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D0EBF185E11 for ; Tue, 9 Aug 2016 05:25:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.645 X-Spam-Level: X-Spam-Status: No, score=-4.645 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, WEIRD_PORT=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id bQX85mNU3gsR for ; Tue, 9 Aug 2016 05:25:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 10B575F251 for ; Tue, 9 Aug 2016 05:25:30 +0000 (UTC) Received: (qmail 55948 invoked by uid 99); 9 Aug 2016 05:25:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 05:25:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 19F93DFA6F; Tue, 9 Aug 2016 05:25:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.incubator.apache.org Date: Tue, 09 Aug 2016 05:25:31 -0000 Message-Id: <9311b7610dc84c3d83cd29945d140721@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr & spark running job monitoring archived-at: Tue, 09 Aug 2016 05:25:50 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java new file mode 100644 index 0000000..bb76213 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.spark.running.parser; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.eagle.jpm.spark.crawl.EventType; +import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.entities.*; +import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.HDFSUtil; +import org.apache.eagle.jpm.util.SparkJobTagName; +import org.apache.eagle.jpm.util.Utils; +import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils; +import org.apache.eagle.jpm.util.resourceFetch.model.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.*; +import java.util.function.Function; + +public class SparkApplicationParser implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(SparkApplicationParser.class); + + public enum ParserStatus { + RUNNING, + FINISHED, + APP_FINISHED + } + + private AppInfo app; + private static final int MAX_RETRY_TIMES = 2; + private SparkAppEntityCreationHandler sparkAppEntityCreationHandler; + // + private Map sparkAppEntityMap; + private Map sparkJobConfigs; + private Map>> stagesTime; + private Set completeStages; + private Configuration hdfsConf; + private SparkRunningConfigManager.EndpointConfig endpointConfig; + private final Object lock = new Object(); + private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); + private Map commonTags = new HashMap<>(); + private SparkRunningJobManager sparkRunningJobManager; + private ParserStatus parserStatus; + private ResourceFetcher rmResourceFetcher; + private int currentAttempt; + private boolean first; + + static { + OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); + } + + public SparkApplicationParser(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig, + SparkRunningConfigManager.EndpointConfig endpointConfig, + SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig, + AppInfo app, Map sparkApp, + SparkRunningJobManager sparkRunningJobManager, ResourceFetcher rmResourceFetcher) { + this.sparkAppEntityCreationHandler = new SparkAppEntityCreationHandler(eagleServiceConfig); + this.endpointConfig = endpointConfig; + this.app = app; + this.sparkJobConfigs = new HashMap<>(); + this.stagesTime = new HashMap<>(); + this.completeStages = new HashSet<>(); + this.sparkAppEntityMap = sparkApp; + if (this.sparkAppEntityMap == null) { + this.sparkAppEntityMap = new HashMap<>(); + } + this.rmResourceFetcher = rmResourceFetcher; + this.currentAttempt = 1; + this.first = true; + this.hdfsConf = new Configuration(); + this.hdfsConf.set("fs.defaultFS", endpointConfig.nnEndpoint); + this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true); + this.hdfsConf.set("hdfs.kerberos.principal", endpointConfig.principal); + this.hdfsConf.set("hdfs.keytab.file", endpointConfig.keyTab); + + this.commonTags.put(SparkJobTagName.SITE.toString(), jobExtractorConfig.site); + this.commonTags.put(SparkJobTagName.SPARK_USER.toString(), app.getUser()); + this.commonTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue()); + this.parserStatus = ParserStatus.FINISHED; + this.sparkRunningJobManager = sparkRunningJobManager; + } + + public ParserStatus status() { + return this.parserStatus; + } + + public void setStatus(ParserStatus status) { + this.parserStatus = status; + } + + private void finishSparkApp(String sparkAppId) { + SparkAppEntity attemptEntity = sparkAppEntityMap.get(sparkAppId); + attemptEntity.setYarnState(Constants.AppState.FINISHED.toString()); + attemptEntity.setYarnStatus(Constants.AppStatus.FAILED.toString()); + sparkJobConfigs.remove(sparkAppId); + if (sparkJobConfigs.size() == 0) { + this.parserStatus = ParserStatus.APP_FINISHED; + } + stagesTime.clear(); + LOG.info("spark application {} has been finished", sparkAppId); + } + + private void fetchSparkRunningInfo() throws Exception { + for (int i = 0; i < MAX_RETRY_TIMES; i++) { + if (fetchSparkApps()) { + break; + } else if (i == MAX_RETRY_TIMES - 1) { + //check whether the app has finished. if we test that we can connect rm, then we consider the app has finished + //if we get here either because of cannot connect rm or the app has finished + rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB); + sparkAppEntityMap.keySet().forEach(this::finishSparkApp); + return; + } + } + + List> functions = new ArrayList<>(); + functions.add(fetchSparkExecutors); + functions.add(fetchSparkJobs); + if (!first) { + functions.add(fetchSparkStagesAndTasks); + } + + this.first = false; + for (String sparkAppId : sparkAppEntityMap.keySet()) { + for (Function function : functions) { + int i = 0; + for (; i < MAX_RETRY_TIMES; i++) { + if (function.apply(sparkAppId)) { + break; + } + } + if (i >= MAX_RETRY_TIMES) { + //may caused by rm unreachable + rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB); + finishSparkApp(sparkAppId); + break; + } + } + } + } + + @Override + public void run() { + synchronized (this.lock) { + if (this.parserStatus == ParserStatus.APP_FINISHED) { + return; + } + + LOG.info("start to process yarn application " + app.getId()); + try { + fetchSparkRunningInfo(); + } catch (Exception e) { + LOG.warn("exception found when process application {}, {}", app.getId(), e); + e.printStackTrace(); + } finally { + for (String jobId : sparkAppEntityMap.keySet()) { + sparkAppEntityCreationHandler.add(sparkAppEntityMap.get(jobId)); + } + if (sparkAppEntityCreationHandler.flush()) { //force flush + //we must flush entities before delete from zk in case of missing finish state of jobs + //delete from zk if needed + sparkAppEntityMap.keySet() + .stream() + .filter( + jobId -> sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FINISHED.toString()) || + sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FAILED.toString())) + .forEach( + jobId -> this.sparkRunningJobManager.delete(app.getId(), jobId)); + } + + LOG.info("finish process yarn application " + app.getId()); + } + + if (this.parserStatus == ParserStatus.RUNNING) { + this.parserStatus = ParserStatus.FINISHED; + } + } + } + + private JobConfig parseJobConfig(InputStream is) throws Exception { + JobConfig jobConfig = new JobConfig(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) { + String line; + boolean stop = false; + while ((line = reader.readLine()) != null && !stop) { + try { + JSONParser parser = new JSONParser(); + JSONObject eventObj = (JSONObject) parser.parse(line); + + if (eventObj != null) { + String eventType = (String) eventObj.get("Event"); + LOG.info("Event type: " + eventType); + if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) { + stop = true; + JSONObject sparkProps = (JSONObject) eventObj.get("Spark Properties"); + for (Object key : sparkProps.keySet()) { + jobConfig.put((String) key, (String) sparkProps.get(key)); + } + } + } + } catch (Exception e) { + LOG.error(String.format("Fail to parse %s.", line), e); + } + } + + return jobConfig; + } + } + + private JobConfig getJobConfig(String sparkAppId, int attemptId) { + //TODO: getResourceManagerVersion() and compare version to make attempt id. + + LOG.info("Get job config for sparkAppId {}, attempt {}, appId {}", sparkAppId, attemptId, app.getId()); + JobConfig jobConfig = null; + + try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) { +// // For Yarn version >= 2.7, +// // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001" +// String attemptIdFormatted = String.format("%06d", attemptId); +// // remove "application_" to get the number part of appID. +// String sparkAppIdNum = sparkAppId.substring(12); +// String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted; + + // For Yarn version 2.4.x + // log name: application_1464382345557_269065_1 + String attemptIdString = Integer.toString(attemptId); + + //test appId_attemptId.inprogress/appId_attemptId/appId.inprogress/appId + String eventLogDir = this.endpointConfig.eventLog; + Path attemptFile = new Path(eventLogDir + "/" + sparkAppId + "_" + attemptIdString + ".inprogress"); + if (!hdfs.exists(attemptFile)) { + attemptFile = new Path(eventLogDir + "/" + sparkAppId + "_" + attemptIdString); + if (!hdfs.exists(attemptFile)) { + attemptFile = new Path(eventLogDir + "/" + sparkAppId + ".inprogress"); + if (!hdfs.exists(attemptFile)) { + attemptFile = new Path(eventLogDir + "/" + sparkAppId); + } + } + } + + LOG.info("Attempt File path: " + attemptFile.toString()); + jobConfig = parseJobConfig(hdfs.open(attemptFile)); + } catch (Exception e) { + LOG.error("Fail to process application {}", sparkAppId, e); + } + + return jobConfig; + } + + private boolean isClientMode(JobConfig jobConfig) { + return jobConfig.containsKey(Constants.SPARK_MASTER_KEY) && + jobConfig.get(Constants.SPARK_MASTER_KEY).equalsIgnoreCase("yarn-client"); + } + + private boolean fetchSparkApps() { + String appURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "?" + Constants.ANONYMOUS_PARAMETER; + InputStream is = null; + SparkApplication[] sparkApplications = null; + try { + is = InputStreamUtils.getInputStream(appURL, null, Constants.CompressionType.NONE); + LOG.info("fetch spark application from {}", appURL); + sparkApplications = OBJ_MAPPER.readValue(is, SparkApplication[].class); + } catch (java.net.ConnectException e) { + LOG.warn("fetch spark application from {} failed, {}", appURL, e); + e.printStackTrace(); + return true; + } catch (Exception e) { + LOG.warn("fetch spark application from {} failed, {}", appURL, e); + e.printStackTrace(); + return false; + } finally { + Utils.closeInputStream(is); + } + + for (SparkApplication sparkApplication : sparkApplications) { + String id = sparkApplication.getId(); + if (id.contains(" ") || !id.startsWith("app")) { + //spark version < 1.6.0 and id contains white space, need research again later + LOG.warn("skip spark application {}", id); + continue; + } + + currentAttempt = sparkApplication.getAttempts().size(); + int lastSavedAttempt = 1; + if (sparkAppEntityMap.containsKey(id)) { + lastSavedAttempt = Integer.parseInt(sparkAppEntityMap.get(id).getTags().get(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString())); + } + for (int j = lastSavedAttempt; j <= currentAttempt; j++) { + SparkAppEntity attemptEntity = new SparkAppEntity(); + commonTags.put(SparkJobTagName.SPARK_APP_NAME.toString(), sparkApplication.getName()); + commonTags.put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), "" + j); + commonTags.put(SparkJobTagName.SPARK_APP_ID.toString(), id); + attemptEntity.setTags(new HashMap<>(commonTags)); + attemptEntity.setAppInfo(app); + + attemptEntity.setStartTime(Utils.dateTimeToLong(sparkApplication.getAttempts().get(j - 1).getStartTime())); + attemptEntity.setTimestamp(attemptEntity.getStartTime()); + + if (sparkJobConfigs.containsKey(id) && j == currentAttempt) { + attemptEntity.setConfig(sparkJobConfigs.get(id)); + } + + if (attemptEntity.getConfig() == null) { + attemptEntity.setConfig(getJobConfig(id, j)); + if (j == currentAttempt) { + sparkJobConfigs.put(id, attemptEntity.getConfig()); + } + } + + try { + JobConfig jobConfig = attemptEntity.getConfig(); + attemptEntity.setExecMemoryBytes(Utils.parseMemory(jobConfig.get(Constants.SPARK_EXECUTOR_MEMORY_KEY))); + + attemptEntity.setDriveMemoryBytes(isClientMode(jobConfig) ? + 0 : + Utils.parseMemory(jobConfig.get(Constants.SPARK_DRIVER_MEMORY_KEY))); + attemptEntity.setExecutorCores(Integer.parseInt(jobConfig.get(Constants.SPARK_EXECUTOR_CORES_KEY))); + // spark.driver.cores may not be set. + String driverCoresStr = jobConfig.get(Constants.SPARK_DRIVER_CORES_KEY); + int driverCores = 0; + if (driverCoresStr != null && !isClientMode(jobConfig)) { + driverCores = Integer.parseInt(driverCoresStr); + } + attemptEntity.setDriverCores(driverCores); + } catch (Exception e) { + LOG.warn("add config failed, {}", e); + e.printStackTrace(); + } + + if (j == currentAttempt) { + //current attempt + attemptEntity.setYarnState(app.getState()); + attemptEntity.setYarnStatus(app.getFinalStatus()); + sparkAppEntityMap.put(id, attemptEntity); + this.sparkRunningJobManager.update(app.getId(), id, attemptEntity); + } else { + attemptEntity.setYarnState(Constants.AppState.FINISHED.toString()); + attemptEntity.setYarnStatus(Constants.AppStatus.FAILED.toString()); + } + sparkAppEntityCreationHandler.add(attemptEntity); + } + } + + sparkAppEntityCreationHandler.flush(); + return true; + } + + private Function fetchSparkExecutors = sparkAppId -> { + //only get current attempt + SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId); + String executorURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_EXECUTORS_URL + "?" + Constants.ANONYMOUS_PARAMETER; + InputStream is = null; + SparkExecutor[] sparkExecutors = null; + try { + is = InputStreamUtils.getInputStream(executorURL, null, Constants.CompressionType.NONE); + LOG.info("fetch spark executor from {}", executorURL); + sparkExecutors = OBJ_MAPPER.readValue(is, SparkExecutor[].class); + } catch (java.net.ConnectException e) { + LOG.warn("fetch spark application from {} failed, {}", executorURL, e); + e.printStackTrace(); + return true; + } catch (Exception e) { + LOG.warn("fetch spark executor from {} failed, {}", executorURL, e); + e.printStackTrace(); + return false; + } finally { + Utils.closeInputStream(is); + } + sparkAppEntity.setExecutors(sparkExecutors.length); + + for (SparkExecutor executor : sparkExecutors) { + SparkExecutorEntity entity = new SparkExecutorEntity(); + entity.setTags(new HashMap<>(sparkAppEntity.getTags())); + entity.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executor.getId()); + entity.setHostPort(executor.getHostPort()); + entity.setRddBlocks(executor.getRddBlocks()); + entity.setMemoryUsed(executor.getMemoryUsed()); + entity.setDiskUsed(executor.getDiskUsed()); + entity.setActiveTasks(executor.getActiveTasks()); + entity.setFailedTasks(executor.getFailedTasks()); + entity.setCompletedTasks(executor.getCompletedTasks()); + entity.setTotalTasks(executor.getTotalTasks()); + entity.setTotalDuration(executor.getTotalDuration()); + entity.setTotalInputBytes(executor.getTotalInputBytes()); + entity.setTotalShuffleRead(executor.getTotalShuffleRead()); + entity.setTotalShuffleWrite(executor.getTotalShuffleWrite()); + entity.setMaxMemory(executor.getMaxMemory()); + + entity.setTimestamp(sparkAppEntity.getTimestamp()); + entity.setStartTime(sparkAppEntity.getStartTime()); + if (executor.getId().equalsIgnoreCase("driver")) { + entity.setExecMemoryBytes(sparkAppEntity.getDriveMemoryBytes()); + entity.setCores(sparkAppEntity.getDriverCores()); + entity.setMemoryOverhead(sparkAppEntity.getDriverMemoryOverhead()); + } else { + entity.setExecMemoryBytes(sparkAppEntity.getExecMemoryBytes()); + entity.setCores(sparkAppEntity.getExecutorCores()); + entity.setMemoryOverhead(sparkAppEntity.getExecutorMemoryOverhead()); + } + sparkAppEntityCreationHandler.add(entity); + } + return true; + }; + + private Function fetchSparkJobs = sparkAppId -> { + //only get current attempt + SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId); + String jobURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_JOBS_URL + "?" + Constants.ANONYMOUS_PARAMETER; + InputStream is = null; + SparkJob[] sparkJobs = null; + try { + is = InputStreamUtils.getInputStream(jobURL, null, Constants.CompressionType.NONE); + LOG.info("fetch spark job from {}", jobURL); + sparkJobs = OBJ_MAPPER.readValue(is, SparkJob[].class); + } catch (java.net.ConnectException e) { + LOG.warn("fetch spark application from {} failed, {}", jobURL, e); + e.printStackTrace(); + return true; + } catch (Exception e) { + LOG.warn("fetch spark job from {} failed, {}", jobURL, e); + e.printStackTrace(); + return false; + } finally { + Utils.closeInputStream(is); + } + + sparkAppEntity.setNumJobs(sparkJobs.length); + for (SparkJob sparkJob : sparkJobs) { + SparkJobEntity entity = new SparkJobEntity(); + entity.setTags(new HashMap<>(commonTags)); + entity.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), sparkJob.getJobId() + ""); + entity.setSubmissionTime(Utils.dateTimeToLong(sparkJob.getSubmissionTime())); + if (sparkJob.getCompletionTime() != null) { + entity.setCompletionTime(Utils.dateTimeToLong(sparkJob.getCompletionTime())); + } + entity.setNumStages(sparkJob.getStageIds().size()); + entity.setStatus(sparkJob.getStatus()); + entity.setNumTask(sparkJob.getNumTasks()); + entity.setNumActiveTasks(sparkJob.getNumActiveTasks()); + entity.setNumCompletedTasks(sparkJob.getNumCompletedTasks()); + entity.setNumSkippedTasks(sparkJob.getNumSkippedTasks()); + entity.setNumFailedTasks(sparkJob.getNumFailedTasks()); + entity.setNumActiveStages(sparkJob.getNumActiveStages()); + entity.setNumCompletedStages(sparkJob.getNumCompletedStages()); + entity.setNumSkippedStages(sparkJob.getNumSkippedStages()); + entity.setNumFailedStages(sparkJob.getNumFailedStages()); + entity.setStages(sparkJob.getStageIds()); + entity.setTimestamp(sparkAppEntity.getTimestamp()); + + sparkAppEntity.setTotalStages(sparkAppEntity.getTotalStages() + entity.getNumStages()); + sparkAppEntity.setTotalTasks(sparkAppEntity.getTotalTasks() + entity.getNumTask()); + sparkAppEntity.setActiveTasks(sparkAppEntity.getActiveTasks() + entity.getNumActiveTasks()); + sparkAppEntity.setCompleteTasks(sparkAppEntity.getCompleteTasks() + entity.getNumCompletedTasks()); + sparkAppEntity.setSkippedTasks(sparkAppEntity.getSkippedTasks() + entity.getNumSkippedTasks()); + sparkAppEntity.setFailedTasks(sparkAppEntity.getFailedStages() + entity.getNumFailedTasks()); + sparkAppEntity.setActiveStages(sparkAppEntity.getActiveStages() + entity.getNumActiveStages()); + sparkAppEntity.setCompleteStages(sparkAppEntity.getCompleteStages() + entity.getNumCompletedStages()); + sparkAppEntity.setSkippedStages(sparkAppEntity.getSkippedStages() + entity.getNumSkippedStages()); + sparkAppEntity.setFailedStages(sparkAppEntity.getFailedStages() + entity.getNumFailedStages()); + + for (Integer stageId : sparkJob.getStageIds()) { + stagesTime.put(stageId, Pair.of(sparkJob.getJobId(), Pair.of(entity.getSubmissionTime(), entity.getCompletionTime()))); + } + sparkAppEntityCreationHandler.add(entity); + } + return true; + }; + + private Function fetchSparkStagesAndTasks = sparkAppId -> { + SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId); + String stageURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_STAGES_URL + "?" + Constants.ANONYMOUS_PARAMETER; + InputStream is = null; + SparkStage[] sparkStages; + try { + is = InputStreamUtils.getInputStream(stageURL, null, Constants.CompressionType.NONE); + LOG.info("fetch spark stage from {}", stageURL); + sparkStages = OBJ_MAPPER.readValue(is, SparkStage[].class); + } catch (java.net.ConnectException e) { + LOG.warn("fetch spark application from {} failed, {}", stageURL, e); + e.printStackTrace(); + return true; + } catch (Exception e) { + LOG.warn("fetch spark stage from {} failed, {}", stageURL, e); + e.printStackTrace(); + return false; + } finally { + Utils.closeInputStream(is); + } + + for (SparkStage sparkStage : sparkStages) { + //TODO + //we need a thread pool to handle this if there are many stages + SparkStage stage; + try { + stageURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_STAGES_URL + "/" + sparkStage.getStageId() + "?" + Constants.ANONYMOUS_PARAMETER; + is = InputStreamUtils.getInputStream(stageURL, null, Constants.CompressionType.NONE); + LOG.info("fetch spark stage from {}", stageURL); + stage = OBJ_MAPPER.readValue(is, SparkStage[].class)[0]; + } catch (Exception e) { + LOG.warn("fetch spark stage from {} failed, {}", stageURL, e); + e.printStackTrace(); + return false; + } finally { + Utils.closeInputStream(is); + } + + if (this.completeStages.contains(stage.getStageId())) { + return true; + } + SparkStageEntity stageEntity = new SparkStageEntity(); + stageEntity.setTags(new HashMap<>(commonTags)); + stageEntity.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), stagesTime.get(stage.getStageId()).getLeft() + ""); + stageEntity.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stage.getStageId() + ""); + stageEntity.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stage.getAttemptId() + ""); + stageEntity.setStatus(stage.getStatus()); + stageEntity.setNumActiveTasks(stage.getNumActiveTasks()); + stageEntity.setNumCompletedTasks(stage.getNumCompleteTasks()); + stageEntity.setNumFailedTasks(stage.getNumFailedTasks()); + stageEntity.setExecutorRunTime(stage.getExecutorRunTime()); + stageEntity.setInputBytes(stage.getInputBytes()); + stageEntity.setInputRecords(stage.getInputRecords()); + stageEntity.setOutputBytes(stage.getOutputBytes()); + stageEntity.setOutputRecords(stage.getOutputRecords()); + stageEntity.setShuffleReadBytes(stage.getShuffleReadBytes()); + stageEntity.setShuffleReadRecords(stage.getShuffleReadRecords()); + stageEntity.setShuffleWriteBytes(stage.getShuffleWriteBytes()); + stageEntity.setShuffleWriteRecords(stage.getShuffleWriteRecords()); + stageEntity.setMemoryBytesSpilled(stage.getMemoryBytesSpilled()); + stageEntity.setDiskBytesSpilled(stage.getDiskBytesSpilled()); + stageEntity.setName(stage.getName()); + stageEntity.setSchedulingPool(stage.getSchedulingPool()); + stageEntity.setSubmitTime(stagesTime.get(stage.getStageId()).getRight().getLeft()); + stageEntity.setTimestamp(stageEntity.getSubmitTime()); + stageEntity.setCompleteTime(stagesTime.get(stage.getStageId()).getRight().getRight()); + stageEntity.setNumTasks(stage.getTasks() == null ? 0 : stage.getTasks().size()); + fetchTasksFromStage(stageEntity, stage); + sparkAppEntityCreationHandler.add(stageEntity); + if (stage.getStatus().equals(Constants.StageState.COMPLETE.toString())) { + this.completeStages.add(stage.getStageId()); + LOG.info("stage {} of spark {} has finished", stage.getStageId(), sparkAppId); + } + + sparkAppEntity.setInputBytes(sparkAppEntity.getInputBytes() + stageEntity.getInputBytes()); + sparkAppEntity.setInputRecords(sparkAppEntity.getInputBytes() + stageEntity.getInputRecords()); + sparkAppEntity.setOutputBytes(sparkAppEntity.getOutputBytes() + stageEntity.getOutputBytes()); + sparkAppEntity.setOutputRecords(sparkAppEntity.getOutputBytes() + stageEntity.getOutputRecords()); + sparkAppEntity.setShuffleReadBytes(sparkAppEntity.getShuffleReadBytes() + stageEntity.getShuffleReadBytes()); + sparkAppEntity.setShuffleReadRecords(sparkAppEntity.getShuffleReadRecords() + stageEntity.getShuffleReadRecords()); + sparkAppEntity.setShuffleWriteBytes(sparkAppEntity.getShuffleWriteBytes() + stageEntity.getShuffleWriteBytes()); + sparkAppEntity.setShuffleWriteRecords(sparkAppEntity.getShuffleWriteRecords() + stageEntity.getShuffleWriteRecords()); + sparkAppEntity.setExecutorRunTime(sparkAppEntity.getExecutorRunTime() + stageEntity.getExecutorRunTime()); + sparkAppEntity.setExecutorDeserializeTime(sparkAppEntity.getExecutorDeserializeTime() + stageEntity.getExecutorDeserializeTime()); + sparkAppEntity.setResultSize(sparkAppEntity.getResultSize() + stageEntity.getResultSize()); + sparkAppEntity.setJvmGcTime(sparkAppEntity.getJvmGcTime() + stageEntity.getJvmGcTime()); + sparkAppEntity.setResultSerializationTime(sparkAppEntity.getResultSerializationTime() + stageEntity.getResultSerializationTime()); + sparkAppEntity.setMemoryBytesSpilled(sparkAppEntity.getMemoryBytesSpilled() + stageEntity.getMemoryBytesSpilled()); + sparkAppEntity.setDiskBytesSpilled(sparkAppEntity.getDiskBytesSpilled() + stageEntity.getDiskBytesSpilled()); + sparkAppEntity.setCompleteTasks(sparkAppEntity.getCompleteTasks() + stageEntity.getNumCompletedTasks()); + } + return true; + }; + + private void fetchTasksFromStage(SparkStageEntity stageEntity, SparkStage stage) { + Map tasks = stage.getTasks(); + for (String key : tasks.keySet()) { + SparkTask task = tasks.get(key); + SparkTaskEntity taskEntity = new SparkTaskEntity(); + taskEntity.setTags(new HashMap<>(stageEntity.getTags())); + taskEntity.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), task.getAttempt() + ""); + taskEntity.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), task.getIndex() + ""); + taskEntity.setTaskId(task.getTaskId()); + taskEntity.setLaunchTime(Utils.dateTimeToLong(task.getLaunchTime())); + taskEntity.setHost(task.getHost()); + taskEntity.setTaskLocality(task.getTaskLocality()); + taskEntity.setSpeculative(task.isSpeculative()); + taskEntity.setTimestamp(stageEntity.getTimestamp()); + + SparkTaskMetrics taskMetrics = task.getTaskMetrics(); + taskEntity.setExecutorDeserializeTime(taskMetrics == null ? 0 : taskMetrics.getExecutorDeserializeTime()); + taskEntity.setExecutorRunTime(taskMetrics == null ? 0 : taskMetrics.getExecutorRunTime()); + taskEntity.setResultSize(taskMetrics == null ? 0 : taskMetrics.getResultSize()); + taskEntity.setJvmGcTime(taskMetrics == null ? 0 : taskMetrics.getJvmGcTime()); + taskEntity.setResultSerializationTime(taskMetrics == null ? 0 : taskMetrics.getResultSerializationTime()); + taskEntity.setMemoryBytesSpilled(taskMetrics == null ? 0 : taskMetrics.getMemoryBytesSpilled()); + taskEntity.setDiskBytesSpilled(taskMetrics == null ? 0 : taskMetrics.getDiskBytesSpilled()); + + SparkTaskInputMetrics inputMetrics = null; + if (taskMetrics != null && taskMetrics.getInputMetrics() != null) { + inputMetrics = taskMetrics.getInputMetrics(); + } + taskEntity.setInputBytes(inputMetrics == null ? 0 : inputMetrics.getBytesRead()); + taskEntity.setInputRecords(inputMetrics == null ? 0 : inputMetrics.getRecordsRead()); + + //need to figure outputMetrics + + SparkTaskShuffleReadMetrics shuffleReadMetrics = null; + if (taskMetrics != null && taskMetrics.getShuffleReadMetrics() != null) { + shuffleReadMetrics = taskMetrics.getShuffleReadMetrics(); + } + taskEntity.setShuffleReadRemoteBytes(shuffleReadMetrics == null ? 0 : shuffleReadMetrics.getRemoteBytesRead()); + taskEntity.setShuffleReadRecords(shuffleReadMetrics == null ? 0 : shuffleReadMetrics.getRecordsRead()); + + SparkTaskShuffleWriteMetrics shuffleWriteMetrics = null; + if (taskMetrics != null && taskMetrics.getShuffleWriteMetrics() != null) { + shuffleWriteMetrics = taskMetrics.getShuffleWriteMetrics(); + } + taskEntity.setShuffleWriteBytes(shuffleWriteMetrics == null ? 0 : shuffleWriteMetrics.getBytesWritten()); + taskEntity.setShuffleWriteRecords(shuffleWriteMetrics == null ? 0 : shuffleWriteMetrics.getRecordsWritten()); + + stageEntity.setExecutorDeserializeTime(stageEntity.getExecutorDeserializeTime() + taskEntity.getExecutorDeserializeTime()); + stageEntity.setResultSize(stageEntity.getResultSize() + taskEntity.getResultSize()); + stageEntity.setJvmGcTime(stageEntity.getJvmGcTime() + taskEntity.getJvmGcTime()); + stageEntity.setResultSerializationTime(stageEntity.getResultSerializationTime() + taskEntity.getResultSerializationTime()); + + this.sparkAppEntityCreationHandler.add(taskEntity); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java new file mode 100644 index 0000000..2b6c62f --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.spark.running.recover; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity; +import org.apache.eagle.jpm.util.jobrecover.RunningJobManager; +import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; + +import java.io.Serializable; +import java.util.*; + +public class SparkRunningJobManager implements Serializable { + private RunningJobManager runningJobManager; + + public SparkRunningJobManager(SparkRunningConfigManager.ZKStateConfig config) { + this.runningJobManager = new RunningJobManager(config.zkQuorum, + config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot); + } + + public Map recoverYarnApp(String appId) throws Exception { + Map, AppInfo>> result = this.runningJobManager.recoverYarnApp(appId); + Map apps = new HashMap<>(); + for (String jobId : result.keySet()) { + Pair, AppInfo> job = result.get(jobId); + SparkAppEntity sparkAppEntity = new SparkAppEntity(); + sparkAppEntity.setTags(job.getLeft()); + sparkAppEntity.setAppInfo(job.getRight()); + sparkAppEntity.setTimestamp(job.getRight().getStartedTime()); + apps.put(jobId, sparkAppEntity); + } + return apps; + } + + public Map> recover() { + //we need read from zookeeper, path looks like /apps/mr/running/yarnAppId/jobId/ + //> + Map> result = new HashMap<>(); + Map, AppInfo>>> apps = this.runningJobManager.recover(); + for (String appId : apps.keySet()) { + result.put(appId, new HashMap<>()); + Map, AppInfo>> jobs = apps.get(appId); + + for (String jobId : jobs.keySet()) { + Pair, AppInfo> job = jobs.get(jobId); + SparkAppEntity sparkAppEntity = new SparkAppEntity(); + sparkAppEntity.setTags(job.getLeft()); + sparkAppEntity.setAppInfo(job.getRight()); + sparkAppEntity.setTimestamp(job.getRight().getStartedTime()); + result.get(appId).put(jobId, sparkAppEntity); + } + } + return result; + } + + public void update(String yarnAppId, String jobId, SparkAppEntity entity) { + this.runningJobManager.update(yarnAppId, jobId, entity.getTags(), entity.getAppInfo()); + } + + public void delete(String yarnAppId, String jobId) { + this.runningJobManager.delete(yarnAppId, jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java new file mode 100644 index 0000000..6be0cfd --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.spark.running.storm; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity; +import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class SparkRunningJobFetchSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobFetchSpout.class); + + private SparkRunningConfigManager.ZKStateConfig zkStateConfig; + private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig; + private SparkRunningConfigManager.EndpointConfig endpointConfig; + private ResourceFetcher resourceFetcher; + private SpoutOutputCollector collector; + private boolean init; + private transient SparkRunningJobManager sparkRunningJobManager; + private Set runningYarnApps; + + public SparkRunningJobFetchSpout(SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig, + SparkRunningConfigManager.EndpointConfig endpointConfig, + SparkRunningConfigManager.ZKStateConfig zkStateConfig) { + this.jobExtractorConfig = jobExtractorConfig; + this.endpointConfig = endpointConfig; + this.zkStateConfig = zkStateConfig; + this.init = !(zkStateConfig.recoverEnabled); + this.runningYarnApps = new HashSet<>(); + } + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls); + collector = spoutOutputCollector; + this.sparkRunningJobManager = new SparkRunningJobManager(zkStateConfig); + } + + @Override + public void nextTuple() { + LOG.info("Start to fetch spark running jobs"); + try { + Map> sparkApps = null; + List apps; + if (!this.init) { + sparkApps = recoverRunningApps(); + + apps = new ArrayList<>(); + for (String appId : sparkApps.keySet()) { + Map jobs = sparkApps.get(appId); + if (jobs.size() > 0) { + Set jobIds = jobs.keySet(); + apps.add(jobs.get(jobIds.iterator().next()).getAppInfo()); + this.runningYarnApps.add(appId); + } + } + LOG.info("recover {} spark yarn apps from zookeeper", apps.size()); + this.init = true; + } else { + apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB); + LOG.info("get {} apps from resource manager", apps == null ? 0 : apps.size()); + Set running = new HashSet<>(); + Iterator appIdIterator = this.runningYarnApps.iterator(); + while (appIdIterator.hasNext()) { + String appId = appIdIterator.next(); + boolean hasFinished = true; + if (apps != null) { + for (AppInfo appInfo : apps) { + if (appId.equals(appInfo.getId())) { + hasFinished = false; + } + running.add(appInfo.getId()); + } + + if (hasFinished) { + try { + Map result = this.sparkRunningJobManager.recoverYarnApp(appId); + if (result.size() > 0) { + if (sparkApps == null) { + sparkApps = new HashMap<>(); + } + sparkApps.put(appId, result); + AppInfo appInfo = result.get(result.keySet().iterator().next()).getAppInfo(); + appInfo.setState(Constants.AppState.FINISHED.toString()); + apps.add(appInfo); + } + } catch (KeeperException.NoNodeException e) { + LOG.warn("{}", e); + LOG.warn("yarn app {} has finished", appId); + } + } + } + } + + this.runningYarnApps = running; + LOG.info("get {} total apps(contains finished)", apps == null ? 0 : apps.size()); + } + + if (apps != null) { + for (AppInfo app : apps) { + LOG.info("emit spark yarn application " + app.getId()); + if (sparkApps != null) { + //emit (AppInfo, Map) + collector.emit(new Values(app.getId(), app, sparkApps.get(app.getId()))); + } else { + collector.emit(new Values(app.getId(), app, null)); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + Thread.sleep(jobExtractorConfig.fetchRunningJobInterval * 1000); + } catch (Exception e) { + } + } + } + + private Map> recoverRunningApps() { + //we need read from zookeeper, path looks like /apps/spark/running/yarnAppId/appId/ + //content of path /apps/spark/running/yarnAppId/appId is SparkAppEntity(current attempt) + //as we know, a yarn application may contains many spark applications + //so, the returned results is a Map, key is yarn appId + Map> result = this.sparkRunningJobManager.recover(); + return result; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("appId", "appInfo", "sparkAppEntity")); + } + + @Override + public void fail(Object msgId) { + + } + + @Override + public void ack(Object msgId) { + + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java new file mode 100644 index 0000000..6928240 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.spark.running.storm; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity; +import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser; +import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; + +public class SparkRunningJobParseBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobParseBolt.class); + + private SparkRunningConfigManager.ZKStateConfig zkStateConfig; + private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig; + private SparkRunningConfigManager.EndpointConfig endpointConfig; + private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig; + private ExecutorService executorService; + private Map runningSparkParsers; + private ResourceFetcher resourceFetcher; + public SparkRunningJobParseBolt(SparkRunningConfigManager.ZKStateConfig zkStateConfig, + SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig, + SparkRunningConfigManager.EndpointConfig endpointConfig, + SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig) { + this.zkStateConfig = zkStateConfig; + this.eagleServiceConfig = eagleServiceConfig; + this.endpointConfig = endpointConfig; + this.jobExtractorConfig = jobExtractorConfig; + this.runningSparkParsers = new HashMap<>(); + } + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { + this.executorService = Executors.newFixedThreadPool(jobExtractorConfig.parseThreadPoolSize); + this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls); + } + + @Override + public void execute(Tuple tuple) { + AppInfo appInfo = (AppInfo)tuple.getValue(1); + Map sparkApp = (Map)tuple.getValue(2); + + LOG.info("get spark yarn application " + appInfo.getId()); + + SparkApplicationParser applicationParser; + if (!runningSparkParsers.containsKey(appInfo.getId())) { + applicationParser = new SparkApplicationParser(eagleServiceConfig, endpointConfig, jobExtractorConfig, appInfo, sparkApp, new SparkRunningJobManager(zkStateConfig), resourceFetcher); + runningSparkParsers.put(appInfo.getId(), applicationParser); + LOG.info("create application parser for {}", appInfo.getId()); + } else { + applicationParser = runningSparkParsers.get(appInfo.getId()); + } + + Set runningParserIds = new HashSet<>(runningSparkParsers.keySet()); + runningParserIds.stream() + .filter(appId -> runningSparkParsers.get(appId).status() == SparkApplicationParser.ParserStatus.APP_FINISHED) + .forEach(appId -> { + runningSparkParsers.remove(appId); + LOG.info("remove parser {}", appId); + }); + + if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) || + applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) { + applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING); + executorService.execute(applicationParser); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + } + + @Override + public void cleanup() { + super.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 0000000..21686a6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.hdfs.DistributedFileSystem +org.apache.hadoop.hdfs.web.HftpFileSystem +org.apache.hadoop.hdfs.web.HsftpFileSystem +org.apache.hadoop.hdfs.web.WebHdfsFileSystem +org.apache.hadoop.hdfs.web.SWebHdfsFileSystem \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf new file mode 100644 index 0000000..d93a135 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "envContextConfig" : { + "env" : "local", + "topologyName" : "sparkRunningJob", + "stormConfigFile" : "storm.yaml", + "parallelismConfig" : { + "sparkRunningJobFetchSpout" : 1, + "sparkRunningJobParseBolt" : 4 + }, + "tasks" : { + "sparkRunningJobFetchSpout" : 1, + "sparkRunningJobParseBolt" : 4 + }, + "workers" : 2 + }, + + "jobExtractorConfig" : { + "site" : "sandbox", + "fetchRunningJobInterval" : 15, + "parseThreadPoolSize" : 5 + }, + + "dataSourceConfig" : { + "rmUrls": ["http://sandbox.hortonworks.com:8088"], + "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020", + "principal" : "", #if not need, then empty + "keytab" : "", + "eventLog" : "/spark-history" + }, + + "zookeeperConfig" : { + "zkQuorum" : "sandbox.hortonworks.com:2181", + "zkPort" : "2181", + "zkRoot" : "/apps/spark/running", + "recoverEnabled" : false, + "zkSessionTimeoutMs" : 15000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 20000 + }, + + "eagleProps" : { + "mailHost" : "abc.com", + "mailDebug" : "true", + eagleService.host:"sandbox.hortonworks.com", + eagleService.port: 9099, + eagleService.username: "admin", + eagleService.password : "secret", + eagleService.readTimeOutSeconds : 20, + eagleService.maxFlushNum : 500 + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties new file mode 100644 index 0000000..6b8c8d6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, DRFA + +eagle.log.dir=../logs +eagle.log.file=eagle.log + +# standard output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n + +# Daily Rolling File Appender + log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender + log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} + log4j.appender.DRFA.DatePattern=.yyyy-MM-dd +## 30-day backup +# log4j.appender.DRFA.MaxBackupIndex=30 + log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index 0792f15..a633fd4 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -17,14 +17,23 @@ package org.apache.eagle.jpm.util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class Constants { + private final static Logger LOG = LoggerFactory.getLogger(Constants.class); + //SPARK public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService"; public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService"; public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService"; public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService"; public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService"; - + public final static String RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME = "RunningSparkAppService"; + public final static String RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME = "RunningSparkJobService"; + public final static String RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME = "RunningSparkStageService"; + public final static String RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME = "RunningSparkTaskService"; + public final static String RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "RunningSparkExecutorService"; public static final String APPLICATION_PREFIX = "application"; public static final String JOB_PREFIX = "job"; public static final String V2_APPS_URL = "ws/v1/cluster/apps"; @@ -33,17 +42,123 @@ public class Constants { public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING"; public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED"; + public static final String SPARK_MASTER_KEY = "spark.master"; + public static final String SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"; + public static final String SPARK_DRIVER_MEMORY_KEY = "spark.driver.memory"; + public static final String SPARK_YARN_AM_MEMORY_KEY = "spark.yarn.am.memory"; + public static final String SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores"; + public static final String SPARK_DRIVER_CORES_KEY = "spark.driver.cores"; + public static final String SPARK_YARN_AM_CORES_KEY = "spark.yarn.am.cores"; + public static final String SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD_KEY = "spark.yarn.executor.memoryOverhead"; + public static final String SPARK_YARN_DRIVER_MEMORY_OVERHEAD_KEY = "spark.yarn.driver.memoryOverhead"; + public static final String SPARK_YARN_am_MEMORY_OVERHEAD_KEY = "spark.yarn.am.memoryOverhead"; + public static final String SPARK_APPS_URL ="api/v1/applications"; + public static final String SPARK_EXECUTORS_URL = "executors"; + public static final String SPARK_JOBS_URL = "jobs"; + public static final String SPARK_STAGES_URL = "stages"; + public static final String MR_JOBS_URL = "ws/v1/mapreduce/jobs"; + public static final String MR_JOB_COUNTERS_URL = "counters"; + public static final String MR_TASKS_URL = "tasks"; + public static final String MR_TASK_ATTEMPTS_URL = "attempts"; + public static final String MR_CONF_URL = "conf"; + + public static final String YARN_API_CLUSTER_INFO = "ws/v1/cluster/info"; public enum CompressionType { GZIP, NONE } public enum JobState { - RUNNING, COMPLETED, ALL + NEW, INITED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED, ERROR, FINISHED, ALL + } + public enum TaskState { + NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED + } + public enum StageState { + ACTIVE, COMPLETE, PENDING + } + public enum AppState { + NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED + } + public enum AppStatus { + UNDEFINED, SUCCEEDED, FAILED, KILLED } - public enum ResourceType { - COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL + COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO + } + + //MR + public static final String JPA_JOB_CONFIG_SERVICE_NAME = "JobConfigService"; + public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService"; + public static final String JPA_JOB_EXECUTION_SERVICE_NAME = "JobExecutionService"; + public static final String JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME = "RunningJobExecutionService"; + public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService"; + public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService"; + public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService"; + public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService"; + public static final String JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME = "RunningTaskExecutionService"; + public static final String JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService"; + public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService"; + + public static final String JOB_TASK_TYPE_TAG = "taskType"; + + public static class JobConfiguration { + // job type + public static final String SCOOBI_JOB = "scoobi.mode"; + public static final String HIVE_JOB = "hive.query.string"; + public static final String PIG_JOB = "pig.script"; + public static final String CASCADING_JOB = "cascading.app.name"; } + /** + * MR task types + */ + public enum TaskType { + SETUP, MAP, REDUCE, CLEANUP + } + + public enum JobType { + CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"), + NOTAVALIABLE("N/A") + ; + private String value; + JobType(String value){ + this.value = value; + } + @Override + public String toString() { + return this.value; + } + } + + public static final String FILE_SYSTEM_COUNTER = "org.apache.hadoop.mapreduce.FileSystemCounter"; + public static final String TASK_COUNTER = "org.apache.hadoop.mapreduce.TaskCounter"; + public static final String JOB_COUNTER = "org.apache.hadoop.mapreduce.JobCounter"; + + public static final String MAP_TASK_ATTEMPT_COUNTER = "MapTaskAttemptCounter"; + public static final String REDUCE_TASK_ATTEMPT_COUNTER = "ReduceTaskAttemptCounter"; + + public static final String MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "MapTaskAttemptFileSystemCounter"; + public static final String REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "ReduceTaskAttemptFileSystemCounter"; + + public enum TaskAttemptCounter { + TASK_ATTEMPT_DURATION, + } + + public enum JobCounter { + DATA_LOCAL_MAPS, + RACK_LOCAL_MAPS, + TOTAL_LAUNCHED_MAPS + } + + public static final String metricFormat = "%s.%s"; + public static final String ALLOCATED_MB = "allocatedmb"; + public static final String ALLOCATED_VCORES = "allocatedvcores"; + public static final String RUNNING_CONTAINERS = "runningcontainers"; + public static final String TASK_EXECUTION_TIME = "taskduration"; + public static final String JOB_LEVEL = "job"; + public static final String TASK_LEVEL = "task"; + + public static final String JOB_DEFINITION_ID_KEY = "jobDefId"; + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java index 8adb001..325a92a 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java @@ -29,7 +29,7 @@ public class HDFSUtil { public static FileSystem getFileSystem(Configuration conf) throws IOException { HDFSUtil.login(conf); - return FileSystem.get(conf); + return FileSystem.get(conf); } public static void login(Configuration kConfig) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java new file mode 100644 index 0000000..ea8e4f4 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.util; + +public enum MRJobTagName { + SITE("site"), + RACK("rack"), + HOSTNAME("hostname"), + JOB_NAME("jobName"), + JOD_DEF_ID("jobDefId"), + JOB_ID("jobId"), + TASK_ID("taskId"), + TASK_ATTEMPT_ID("taskAttemptId"), + JOB_STATUS("jobStatus"), + USER("user"), + TASK_TYPE("taskType"), + TASK_EXEC_TYPE("taskExecType"), + ERROR_CATEGORY("errorCategory"), + JOB_QUEUE("queue"), + RULE_TYPE("ruleType"), + JOB_TYPE("jobType"); + + private String tagName; + private MRJobTagName(String tagName) { + this.tagName = tagName; + } + + public String toString() { + + return this.tagName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java new file mode 100644 index 0000000..7a613eb --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.util; + +import jline.internal.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class Utils { + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + public static void closeInputStream(InputStream is) { + if (is != null) { + try { + is.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public static void sleep(long seconds) { + try { + Thread.sleep(seconds * 1000); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static long dateTimeToLong(String date) { + // date is like: 2016-07-29T19:35:40.715GMT + long timestamp = 0L; + try { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSSzzz"); + Date parsedDate = dateFormat.parse(date); + timestamp = parsedDate.getTime(); + } catch(ParseException e) { + e.printStackTrace(); + } + + if (timestamp == 0L) { + LOG.error("Not able to parse date: " + date); + } + + return timestamp; + } + + public static long parseMemory(String memory) { + if (memory.endsWith("g") || memory.endsWith("G")) { + int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * executorGB; + } else if (memory.endsWith("m") || memory.endsWith("M")) { + int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * executorMB; + } else if (memory.endsWith("k") || memory.endsWith("K")) { + int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * executorKB; + } else if (memory.endsWith("t") || memory.endsWith("T")) { + int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * 1024 * executorTB; + } else if (memory.endsWith("p") || memory.endsWith("P")) { + int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB; + } + Log.info("Cannot parse memory info " + memory); + return 0l; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java new file mode 100644 index 0000000..c8572e9 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.util.jobcounter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure + * counters. + * + */ +public final class CounterGroupDictionary { + + private final List groupKeys = new ArrayList<>(); + + private static volatile CounterGroupDictionary instance = null; + private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class); + + private CounterGroupDictionary() {} + + public static CounterGroupDictionary getInstance() throws JobCounterException { + if (instance == null) { + synchronized (CounterGroupDictionary.class) { + if (instance == null) { + CounterGroupDictionary tmp = new CounterGroupDictionary(); + tmp.initialize(); + instance = tmp; + } + } + } + return instance; + } + + public CounterGroupKey getCounterGroupByName(String groupName) { + for (CounterGroupKey groupKey : groupKeys) { + if (groupKey.getName().equalsIgnoreCase(groupName)) { + return groupKey; + } + } + return null; + } + + public CounterGroupKey getCounterGroupByIndex(int groupIndex) { + if (groupIndex < 0 || groupIndex >= groupKeys.size()) { + return null; + } + return groupKeys.get(groupIndex); + } + + private void initialize() throws JobCounterException { + // load config.properties file from classpath + InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf"); + try { + if (is == null) { + is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf"); + if (is == null) { + final String errMsg = "Failed to load JobCounter.conf"; + LOG.error(errMsg); + throw new JobCounterException(errMsg); + } + } + final Properties prop = new Properties(); + try { + prop.load(is); + } catch(Exception ex) { + final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage(); + LOG.error(errMsg, ex); + throw new JobCounterException(errMsg, ex); + } + int groupIndex = 0; + while (parseGroup(groupIndex, prop)) { + ++groupIndex; + } + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + } + } + } + } + + private boolean parseGroup(int groupIndex, Properties prop) { + final String groupKeyBase = "counter.group" + groupIndex; + final String groupNameKey = groupKeyBase + ".name"; + final String groupName = prop.getProperty(groupNameKey); + + if (groupName == null) { + return false; + } + + final String groupDescriptionKey = groupKeyBase + ".description"; + final String groupDescription = prop.getProperty(groupDescriptionKey); + final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription); + final ArrayList counters = new ArrayList(); + + int counterIndex = 0; + while (parseCounter(groupKey, counterIndex, counters, prop)) { + ++counterIndex; + } + groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()])); + groupKeys.add(groupKey); + return true; + } + + private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List counters, Properties prop) { + final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex; + final String counterNameKey = counterKeyBase + ".names"; + final String counterNamesString = prop.getProperty(counterNameKey); + + if (counterNamesString == null) { + return false; + } + final String[] names = counterNamesString.split(","); + final List counterNames = new ArrayList(); + for (String name : names) { + counterNames.add(name.trim()); + } + + final String counterDescriptionKey = counterKeyBase + ".description"; + final String counterDescription = prop.getProperty(counterDescriptionKey); + + CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey); + counters.add(counter); + return true; + } + + private static class CounterKeyImpl implements CounterKey { + private final int index; + private final List counterNames; + private final String description; + private final CounterGroupKey groupKey; + + public CounterKeyImpl(int index, List counterNames, String description, CounterGroupKey groupKey) { + this.index = index; + this.counterNames = counterNames; + this.description = description; + this.groupKey = groupKey; + } + @Override + public int getIndex() { + return index; + } + @Override + public List getNames() { + return counterNames; + } + @Override + public String getDescription() { + return description; + } + @Override + public CounterGroupKey getGroupKey() { + return groupKey; + } + } + + private static class CounterGroupKeyImpl implements CounterGroupKey { + private final int index; + private final String name; + private final String description; + private CounterKey[] counterKeys; + + public CounterGroupKeyImpl(int index, String name, String description) { + this.index = index; + this.name = name; + this.description = description; + } + + public void setCounterKeys(CounterKey[] counterKeys) { + this.counterKeys = counterKeys; + } + + @Override + public int getIndex() { + return index; + } + @Override + public String getName() { + return name; + } + @Override + public String getDescription() { + return description; + } + @Override + public int getCounterNumber() { + return counterKeys.length; + } + @Override + public List listCounterKeys() { + return Arrays.asList(counterKeys); + } + @Override + public CounterKey getCounterKeyByName(String name) { + for (CounterKey counterKey : counterKeys) { + for (String n : counterKey.getNames()) { + if (n.equalsIgnoreCase(name)) { + return counterKey; + } + } + } + return null; + } + @Override + public CounterKey getCounterKeyByID(int index) { + if (index < 0 || index >= counterKeys.length) { + return null; + } + return counterKeys[index]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java new file mode 100644 index 0000000..482623a --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.util.jobcounter; + +import java.util.List; + +public interface CounterGroupKey { + + String getName(); + String getDescription(); + int getIndex(); + int getCounterNumber(); + List listCounterKeys(); + CounterKey getCounterKeyByName(String name); + CounterKey getCounterKeyByID(int index); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java new file mode 100644 index 0000000..8e4e519 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.util.jobcounter; + +import java.util.List; + +public interface CounterKey { + + List getNames(); + String getDescription(); + int getIndex(); + CounterGroupKey getGroupKey(); + +}