Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6AB07200B6D for ; Tue, 9 Aug 2016 07:25:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 69292160AB3; Tue, 9 Aug 2016 05:25:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 17937160A91 for ; Tue, 9 Aug 2016 07:25:43 +0200 (CEST) Received: (qmail 56160 invoked by uid 500); 9 Aug 2016 05:25:43 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 56151 invoked by uid 99); 9 Aug 2016 05:25:43 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 05:25:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id BD3511A005B for ; Tue, 9 Aug 2016 05:25:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.645 X-Spam-Level: X-Spam-Status: No, score=-4.645 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, WEIRD_PORT=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id n3FnMJ1YEb-n for ; Tue, 9 Aug 2016 05:25:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 42F9160D29 for ; Tue, 9 Aug 2016 05:25:31 +0000 (UTC) Received: (qmail 55957 invoked by uid 99); 9 Aug 2016 05:25:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 05:25:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1FD76ED321; Tue, 9 Aug 2016 05:25:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.incubator.apache.org Date: Tue, 09 Aug 2016 05:25:33 -0000 Message-Id: <7289d5c51c9640539c0b8fc60675bb7c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr & spark running job monitoring archived-at: Tue, 09 Aug 2016 05:25:46 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf new file mode 100644 index 0000000..1524e61 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf @@ -0,0 +1,187 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#### Sample configuration: +## counter.group0.name = groupname1 +## counter.group0.counter0.names = counterName1,counterName2,... +## counter.group0.counter0.description = counter description... + +counter.group0.name = org.apache.hadoop.mapreduce.FileSystemCounter +counter.group0.description = File System Counters +counter.group0.counter0.names = FILE_BYTES_READ +counter.group0.counter0.description = FILE: Number of bytes read +counter.group0.counter1.names = FILE_BYTES_WRITTEN +counter.group0.counter1.description = FILE: Number of bytes written +counter.group0.counter2.names = FILE_READ_OPS +counter.group0.counter2.description = FILE: Number of read operations +counter.group0.counter3.names = FILE_LARGE_READ_OPS +counter.group0.counter3.description = FILE: Number of large read operations +counter.group0.counter4.names = FILE_WRITE_OPS +counter.group0.counter4.description = FILE: Number of write operations +counter.group0.counter5.names = HDFS_BYTES_READ +counter.group0.counter5.description = HDFS: Number of bytes read +counter.group0.counter6.names = HDFS_BYTES_WRITTEN +counter.group0.counter6.description = HDFS: Number of bytes written +counter.group0.counter7.names = HDFS_READ_OPS +counter.group0.counter7.description = HDFS: Number of read operations +counter.group0.counter8.names = HDFS_LARGE_READ_OPS +counter.group0.counter8.description = HDFS: Number of large read operations +counter.group0.counter9.names = HDFS_WRITE_OPS +counter.group0.counter9.description = HDFS: Number of write operations + +counter.group1.name = org.apache.hadoop.mapreduce.TaskCounter +counter.group1.description = Map-Reduce Framework +counter.group1.counter0.names = MAP_INPUT_RECORDS +counter.group1.counter0.description = Map input records +counter.group1.counter1.names = MAP_OUTPUT_RECORDS +counter.group1.counter1.description = Map output records +counter.group1.counter2.names = SPLIT_RAW_BYTES +counter.group1.counter2.description = Input split bytes +counter.group1.counter3.names = SPILLED_RECORDS +counter.group1.counter3.description = Spilled Records +counter.group1.counter4.names = CPU_MILLISECONDS +counter.group1.counter4.description = CPU time spent (ms) +counter.group1.counter5.names = PHYSICAL_MEMORY_BYTES +counter.group1.counter5.description = Physical memory (bytes) snapshot +counter.group1.counter6.names = VIRTUAL_MEMORY_BYTES +counter.group1.counter6.description = Virtual memory (bytes) snapshot +counter.group1.counter7.names = COMMITTED_HEAP_BYTES +counter.group1.counter7.description = Total committed heap usage (bytes) +counter.group1.counter8.names = REDUCE_SHUFFLE_BYTES +counter.group1.counter8.description = Reduce shuffle bytes (bytes) +counter.group1.counter9.names = GC_TIME_MILLIS +counter.group1.counter9.description = GC time milliseconds +counter.group1.counter10.names = MAP_OUTPUT_BYTES +counter.group1.counter10.description = map output bytes +counter.group1.counter11.names = REDUCE_INPUT_RECORDS +counter.group1.counter11.description = reduce input records +counter.group1.counter12.names = COMBINE_INPUT_RECORDS +counter.group1.counter12.description = combine input records +counter.group1.counter13.names = COMBINE_OUTPUT_RECORDS +counter.group1.counter13.description = combine output records +counter.group1.counter14.names = REDUCE_INPUT_GROUPS +counter.group1.counter14.description = reduce input groups +counter.group1.counter15.names = REDUCE_OUTPUT_RECORDS +counter.group1.counter15.description = reduce output records +counter.group1.counter16.names = SHUFFLED_MAPS +counter.group1.counter16.description = shuffled maps +counter.group1.counter17.names = MAP_OUTPUT_MATERIALIZED_BYTES +counter.group1.counter17.description = MAP_OUTPUT_MATERIALIZED_BYTES +counter.group1.counter18.names = MERGED_MAP_OUTPUTS +counter.group1.counter18.description = MERGED_MAP_OUTPUTS +counter.group1.counter19.names = FAILED_SHUFFLE +counter.group1.counter19.description = FAILED_SHUFFLE + +counter.group2.name = org.apache.hadoop.mapreduce.JobCounter +counter.group2.description = Map-Reduce Job Counter +counter.group2.counter0.names = MB_MILLIS_MAPS +counter.group2.counter0.description = Total megabyte-seconds taken by all map tasks +counter.group2.counter1.names = MB_MILLIS_REDUCES +counter.group2.counter1.description = Total megabyte-seconds taken by all reduce tasks +counter.group2.counter2.names = VCORES_MILLIS_MAPS +counter.group2.counter2.description = Total vcore-seconds taken by all map tasks +counter.group2.counter3.names = VCORES_MILLIS_REDUCES +counter.group2.counter3.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter4.names = OTHER_LOCAL_MAPS +counter.group2.counter4.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter5.names = DATA_LOCAL_MAPS +counter.group2.counter5.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter6.names = MILLIS_MAPS +counter.group2.counter6.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter7.names = MILLIS_REDUCES +counter.group2.counter7.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter8.names = TOTAL_LAUNCHED_MAPS +counter.group2.counter8.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter9.names = TOTAL_LAUNCHED_REDUCES +counter.group2.counter9.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter10.names = SLOTS_MILLIS_MAPS +counter.group2.counter10.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter11.names = SLOTS_MILLIS_REDUCES +counter.group2.counter11.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter12.names = RACK_LOCAL_MAPS +counter.group2.counter12.description = Total vcore-seconds taken by all reduce tasks + +counter.group3.name = MapTaskAttemptCounter +counter.group3.description = Reduce Task Attempt Counter Aggregation +counter.group3.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES +counter.group3.counter1.names = MAP_INPUT_RECORDS +counter.group3.counter2.names = MERGED_MAP_OUTPUTS +counter.group3.counter3.names = SPILLED_RECORDS +counter.group3.counter4.names = MAP_OUTPUT_BYTES +counter.group3.counter5.names = COMMITTED_HEAP_BYTES +counter.group3.counter6.names = FAILED_SHUFFLE +counter.group3.counter7.names = CPU_MILLISECONDS +counter.group3.counter8.names = SPLIT_RAW_BYTES +counter.group3.counter9.names = COMBINE_INPUT_RECORDS +counter.group3.counter10.names = PHYSICAL_MEMORY_BYTES +counter.group3.counter11.names = TASK_ATTEMPT_DURATION +counter.group3.counter12.names = VIRTUAL_MEMORY_BYTES +counter.group3.counter13.names = MAP_OUTPUT_RECORDS +counter.group3.counter14.names = GC_TIME_MILLIS +counter.group3.counter15.names = COMBINE_OUTPUT_RECORDS +counter.group3.counter16.names = REDUCE_INPUT_GROUPS +counter.group3.counter17.names = REDUCE_INPUT_RECORDS +counter.group3.counter18.names = REDUCE_OUTPUT_RECORDS +counter.group3.counter19.names = REDUCE_SHUFFLE_BYTES +counter.group3.counter20.names = SHUFFLED_MAPS + +counter.group4.name = ReduceTaskAttemptCounter +counter.group4.description = Reduce Task Attempt Counter Aggregation +counter.group4.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES +counter.group4.counter1.names = MAP_INPUT_RECORDS +counter.group4.counter2.names = MERGED_MAP_OUTPUTS +counter.group4.counter3.names = SPILLED_RECORDS +counter.group4.counter4.names = MAP_OUTPUT_BYTES +counter.group4.counter5.names = COMMITTED_HEAP_BYTES +counter.group4.counter6.names = FAILED_SHUFFLE +counter.group4.counter7.names = CPU_MILLISECONDS +counter.group4.counter8.names = SPLIT_RAW_BYTES +counter.group4.counter9.names = COMBINE_INPUT_RECORDS +counter.group4.counter10.names = PHYSICAL_MEMORY_BYTES +counter.group4.counter11.names = TASK_ATTEMPT_DURATION +counter.group4.counter12.names = VIRTUAL_MEMORY_BYTES +counter.group4.counter13.names = MAP_OUTPUT_RECORDS +counter.group4.counter14.names = GC_TIME_MILLIS +counter.group4.counter15.names = COMBINE_OUTPUT_RECORDS +counter.group4.counter16.names = REDUCE_INPUT_GROUPS +counter.group4.counter17.names = REDUCE_INPUT_RECORDS +counter.group4.counter18.names = REDUCE_OUTPUT_RECORDS +counter.group4.counter19.names = REDUCE_SHUFFLE_BYTES +counter.group4.counter20.names = SHUFFLED_MAPS + +counter.group5.name = MapTaskAttemptFileSystemCounter +counter.group5.description = Map Task Attempt File System Counter Aggregation +counter.group5.counter0.names = FILE_READ_OPS +counter.group5.counter1.names = FILE_WRITE_OPS +counter.group5.counter2.names = FILE_BYTES_READ +counter.group5.counter3.names = FILE_LARGE_READ_OPS +counter.group5.counter4.names = HDFS_BYTES_READ +counter.group5.counter5.names = FILE_BYTES_WRITTEN +counter.group5.counter6.names = HDFS_LARGE_READ_OPS +counter.group5.counter7.names = HDFS_BYTES_WRITTEN +counter.group5.counter8.names = HDFS_READ_OPS + +counter.group6.name = ReduceTaskAttemptFileSystemCounter +counter.group6.description = Reduce Task Attempt File System Counter Aggregation +counter.group6.description = Map-Reduce Job Counter +counter.group6.counter0.names = FILE_READ_OPS +counter.group6.counter1.names = FILE_WRITE_OPS +counter.group6.counter2.names = FILE_BYTES_READ +counter.group6.counter3.names = FILE_LARGE_READ_OPS +counter.group6.counter4.names = HDFS_BYTES_READ +counter.group6.counter5.names = FILE_BYTES_WRITTEN +counter.group6.counter6.names = HDFS_LARGE_READ_OPS +counter.group6.counter7.names = HDFS_BYTES_WRITTEN +counter.group6.counter8.names = HDFS_READ_OPS \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf new file mode 100644 index 0000000..5c0d8f9 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "envContextConfig" : { + "env" : "local", + "topologyName" : "mrRunningJob", + "stormConfigFile" : "storm.yaml", + "parallelismConfig" : { + "mrRunningJobFetchSpout" : 1, + "mrRunningJobParseBolt" : 10 + }, + "tasks" : { + "mrRunningJobFetchSpout" : 1, + "mrRunningJobParseBolt" : 10 + }, + "workers" : 5 + }, + + "jobExtractorConfig" : { + "site" : "sandbox", + "fetchRunningJobInterval" : 60, + "parseJobThreadPoolSize" : 5, #job concurrent + "topAndBottomTaskByElapsedTime" : 50 + }, + + "zookeeperConfig" : { + "zkQuorum" : "sandbox.hortonworks.com:2181", + "zkPort" : "2181", + "zkRoot" : "/apps/mr/running", + "zkSessionTimeoutMs" : 15000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 20000 + }, + + "dataSourceConfig" : { + "rmUrls": ["http://sandbox.hortonworks.com:50030"] + }, + + "eagleProps" : { + "mailHost" : "abc.com", + "mailDebug" : "true", + "eagleService": { + "host": "sandbox.hortonworks.com", + "port": 9099, + "readTimeOutSeconds" : 20, + "maxFlushNum" : 500, + "username": "admin", + "password": "secret" + } + }, + + "MRConfigureKeys" : { + "jobConfigKey" : [ + "mapreduce.map.output.compress", + "mapreduce.map.output.compress.codec", + "mapreduce.output.fileoutputformat.compress", + "mapreduce.output.fileoutputformat.compress.type", + "mapreduce.output.fileoutputformat.compress.codec", + "mapred.output.format.class", + "eagle.job.runid", + "eagle.job.runidfieldname", + "eagle.job.name", + "eagle.job.normalizedfieldname", + "eagle.alert.email", + "eagle.job.alertemailaddress", + "dataplatform.etl.info", + "mapreduce.map.memory.mb", + "mapreduce.reduce.memory.mb", + "mapreduce.map.java.opts", + "mapreduce.reduce.java.opts" + ] + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties new file mode 100644 index 0000000..6b8c8d6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, DRFA + +eagle.log.dir=../logs +eagle.log.file=eagle.log + +# standard output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n + +# Daily Rolling File Appender + log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender + log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} + log4j.appender.DRFA.DatePattern=.yyyy-MM-dd +## 30-day backup +# log4j.appender.DRFA.MaxBackupIndex=30 + log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java deleted file mode 100644 index 9c720c8..0000000 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.history.crawl; - -public enum EventType { - SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart, - SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved, - SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java deleted file mode 100644 index 1f76a2f..0000000 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.history.crawl; - -import java.io.InputStream; - -public interface JHFInputStreamReader { - public void read(InputStream is) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java deleted file mode 100644 index 3fbc769..0000000 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.history.crawl; - -import java.io.InputStream; - -public interface JHFParserBase { - /** - * this method will ensure to close the inputstream - * @param is - * @throws Exception - */ - public void parse(InputStream is) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java deleted file mode 100644 index 99aa68d..0000000 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java +++ /dev/null @@ -1,701 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.history.crawl; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import jline.internal.Log; -import org.apache.eagle.jpm.entity.*; -import org.apache.eagle.jpm.util.JSONUtil; -import org.apache.eagle.jpm.util.JobNameNormalization; -import org.apache.eagle.jpm.util.SparkEntityConstant; -import org.apache.eagle.jpm.util.SparkJobTagName; -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.service.client.impl.EagleServiceBaseClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class JHFSparkEventReader { - - public static final int FLUSH_LIMIT = 500; - private static final Logger logger = LoggerFactory.getLogger(JHFSparkEventReader.class); - - private Map executors; - private SparkApp app; - private Map jobs; - private Map stages; - private Map> jobStageMap; - private Map tasks; - private EagleServiceClientImpl client; - private Map> stageTaskStatusMap; - - private List createEntities; - - private Config conf; - - public JHFSparkEventReader(Map baseTags, SparkApplicationInfo info) { - app = new SparkApp(); - app.setTags(new HashMap(baseTags)); - app.setYarnState(info.getState()); - app.setYarnStatus(info.getFinalStatus()); - createEntities = new ArrayList<>(); - jobs = new HashMap(); - stages = new HashMap(); - jobStageMap = new HashMap>(); - tasks = new HashMap(); - executors = new HashMap(); - stageTaskStatusMap = new HashMap<>(); - conf = ConfigFactory.load(); - this.initiateClient(); - } - - public void read(JSONObject eventObj) throws Exception { - String eventType = (String) eventObj.get("Event"); - if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) { - handleAppStarted(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) { - handleEnvironmentSet(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) { - handleExecutorAdd(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) { - handleBlockManagerAdd(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) { - handleJobStart(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) { - handleStageSubmit(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) { - handleTaskStart(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) { - handleTaskEnd(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) { - handleStageComplete(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) { - handleJobEnd(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) { - handleExecutorRemoved(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) { - handleAppEnd(eventObj); - } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) { - //nothing to do now - } else { - logger.info("Not registered event type:" + eventType); - } - - } - - - private void handleEnvironmentSet(JSONObject event) { - app.setConfig(new JobConfig()); - JSONObject sparkProps = (JSONObject) event.get("Spark Properties"); - - List jobConfs = conf.getStringList("basic.jobConf.additional.info"); - String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port", - "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory", - "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"}; - jobConfs.addAll(Arrays.asList(props)); - for (String prop : jobConfs) { - if (sparkProps.containsKey(prop)) { - app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop)); - } - } - } - - private Object getConfigVal(JobConfig config, String configName, String type) { - if (config.getConfig().containsKey(configName)) { - Object val = config.getConfig().get(configName); - if (type.equalsIgnoreCase(Integer.class.getName())) { - return Integer.parseInt((String) val); - } else { - return val; - } - } else { - if (type.equalsIgnoreCase(Integer.class.getName())) { - return conf.getInt("spark.defaultVal." + configName); - } else { - return conf.getString("spark.defaultVal." + configName); - } - - } - } - - - private boolean isClientMode(JobConfig config) { - if (config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client")) { - return true; - } else { - return false; - } - } - - - private void handleAppStarted(JSONObject event) { - //need update all entities tag before app start - List entities = new ArrayList(); - entities.addAll(this.executors.values()); - entities.add(this.app); - - for (TaggedLogAPIEntity entity : entities) { - entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtil.getString(event, "App ID")); - entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtil.getString(event, "App Name")); - entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), JSONUtil.getString(event, "App Attempt ID")); - // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text - // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text - entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtil.getString(event, "App Name"), null)); - entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtil.getString(event, "User")); - } - - this.app.setStartTime(JSONUtil.getLong(event, "Timestamp")); - this.app.setTimestamp(JSONUtil.getLong(event, "Timestamp")); - - } - - private void handleExecutorAdd(JSONObject event) throws Exception { - String executorID = (String) event.get("Executor ID"); - SparkExecutor executor = this.initiateExecutor(executorID, JSONUtil.getLong(event, "Timestamp")); - - JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor Info"); - - } - - private void handleBlockManagerAdd(JSONObject event) throws Exception { - long maxMemory = JSONUtil.getLong(event, "Maximum Memory"); - long timestamp = JSONUtil.getLong(event, "Timestamp"); - JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager ID"); - String executorID = JSONUtil.getString(blockInfo, "Executor ID"); - String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, "Host"), JSONUtil.getLong(blockInfo, "Port")); - - SparkExecutor executor = this.initiateExecutor(executorID, timestamp); - executor.setMaxMemory(maxMemory); - executor.setHostPort(hostport); - } - - private void handleTaskStart(JSONObject event) { - this.initializeTask(event); - } - - private void handleTaskEnd(JSONObject event) { - JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info"); - Integer taskId = JSONUtil.getInt(taskInfo, "Task ID"); - SparkTask task = null; - if (tasks.containsKey(taskId)) { - task = tasks.get(taskId); - } else { - return; - } - - task.setFailed(JSONUtil.getBoolean(taskInfo, "Failed")); - JSONObject taskMetrics = JSONUtil.getJSONObject(event, "Task Metrics"); - if (null != taskMetrics) { - task.setExecutorDeserializeTime(JSONUtil.getLong(taskMetrics, "Executor Deserialize Time")); - task.setExecutorRunTime(JSONUtil.getLong(taskMetrics, "Executor Run Time")); - task.setJvmGcTime(JSONUtil.getLong(taskMetrics, "JVM GC Time")); - task.setResultSize(JSONUtil.getLong(taskMetrics, "Result Size")); - task.setResultSerializationTime(JSONUtil.getLong(taskMetrics, "Result Serialization Time")); - task.setMemoryBytesSpilled(JSONUtil.getLong(taskMetrics, "Memory Bytes Spilled")); - task.setDiskBytesSpilled(JSONUtil.getLong(taskMetrics, "Disk Bytes Spilled")); - - JSONObject inputMetrics = JSONUtil.getJSONObject(taskMetrics, "Input Metrics"); - if (null != inputMetrics) { - task.setInputBytes(JSONUtil.getLong(inputMetrics, "Bytes Read")); - task.setInputRecords(JSONUtil.getLong(inputMetrics, "Records Read")); - } - - JSONObject outputMetrics = JSONUtil.getJSONObject(taskMetrics, "Output Metrics"); - if (null != outputMetrics) { - task.setOutputBytes(JSONUtil.getLong(outputMetrics, "Bytes Written")); - task.setOutputRecords(JSONUtil.getLong(outputMetrics, "Records Written")); - } - - JSONObject shuffleWriteMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Write Metrics"); - if (null != shuffleWriteMetrics) { - task.setShuffleWriteBytes(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Bytes Written")); - task.setShuffleWriteRecords(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Records Written")); - } - - JSONObject shuffleReadMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Read Metrics"); - if (null != shuffleReadMetrics) { - task.setShuffleReadLocalBytes(JSONUtil.getLong(shuffleReadMetrics, "Local Bytes Read")); - task.setShuffleReadRemoteBytes(JSONUtil.getLong(shuffleReadMetrics, "Remote Bytes Read")); - task.setShuffleReadRecords(JSONUtil.getLong(shuffleReadMetrics, "Total Records Read")); - } - } else { - //for tasks success without task metrics, save in the end if no other information - if (!task.isFailed()) { - return; - } - } - - aggregateToStage(task); - aggregateToExecutor(task); - tasks.remove(taskId); - this.flushEntities(task, false); - } - - private SparkTask initializeTask(JSONObject event) { - SparkTask task = new SparkTask(); - task.setTags(new HashMap(this.app.getTags())); - task.setTimestamp(app.getTimestamp()); - - task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), JSONUtil.getLong(event, "Stage ID").toString()); - task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), JSONUtil.getLong(event, "Stage Attempt ID").toString()); - - JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info"); - task.setTaskId(JSONUtil.getInt(taskInfo, "Task ID")); - task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), JSONUtil.getInt(taskInfo, "Index").toString()); - task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), JSONUtil.getInt(taskInfo, "Attempt").toString()); - task.setLaunchTime(JSONUtil.getLong(taskInfo, "Launch Time")); - task.setExecutorId(JSONUtil.getString(taskInfo, "Executor ID")); - task.setHost(JSONUtil.getString(taskInfo, "Host")); - task.setTaskLocality(JSONUtil.getString(taskInfo, "Locality")); - task.setSpeculative(JSONUtil.getBoolean(taskInfo, "Speculative")); - - tasks.put(task.getTaskId(), task); - return task; - } - - private void handleJobStart(JSONObject event) { - SparkJob job = new SparkJob(); - job.setTags(new HashMap(this.app.getTags())); - job.setTimestamp(app.getTimestamp()); - - Integer jobId = JSONUtil.getInt(event, "Job ID"); - job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString()); - job.setSubmissionTime(JSONUtil.getLong(event, "Submission Time")); - - //for complete application, no active stages/tasks - job.setNumActiveStages(0); - job.setNumActiveTasks(0); - - this.jobs.put(jobId, job); - this.jobStageMap.put(jobId, new HashSet()); - - JSONArray stages = JSONUtil.getJSONArray(event, "Stage Infos"); - job.setNumStages(stages.size()); - for (int i = 0; i < stages.size(); i++) { - JSONObject stageInfo = (JSONObject) stages.get(i); - Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); - Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); - String stageName = JSONUtil.getString(stageInfo, "Stage Name"); - int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks"); - this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks); - - } - - } - - private void handleStageSubmit(JSONObject event) { - JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info"); - Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); - Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); - String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); - stageTaskStatusMap.put(key, new HashMap()); - - if (!stages.containsKey(this.generateStageKey(stageId.toString(), stageAttemptId.toString()))) { - //may be further attempt for one stage - String baseAttempt = this.generateStageKey(stageId.toString(), "0"); - if (stages.containsKey(baseAttempt)) { - SparkStage stage = stages.get(baseAttempt); - String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()); - - String stageName = JSONUtil.getString(event, "Stage Name"); - int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks"); - this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks); - } - } - - } - - private void handleStageComplete(JSONObject event) { - JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info"); - Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); - Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); - String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); - SparkStage stage = stages.get(key); - stage.setSubmitTime(JSONUtil.getLong(stageInfo, "Submission Time")); - stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time")); - - if (stageInfo.containsKey("Failure Reason")) { - stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString()); - } else { - stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()); - } - } - - private void handleExecutorRemoved(JSONObject event) { - String executorID = JSONUtil.getString(event, "Executor ID"); - SparkExecutor executor = executors.get(executorID); - executor.setEndTime(JSONUtil.getLong(event, "Timestamp")); - - } - - private void handleJobEnd(JSONObject event) { - Integer jobId = JSONUtil.getInt(event, "Job ID"); - SparkJob job = jobs.get(jobId); - job.setCompletionTime(JSONUtil.getLong(event, "Completion Time")); - JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result"); - String result = JSONUtil.getString(jobResult, "Result"); - if (result.equalsIgnoreCase("JobSucceeded")) { - job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString()); - } else { - job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString()); - } - - } - - private void handleAppEnd(JSONObject event) { - long endTime = JSONUtil.getLong(event, "Timestamp"); - app.setEndTime(endTime); - } - - public void clearReader() throws Exception { - //clear tasks - for (SparkTask task : tasks.values()) { - logger.info("Task {} does not have result or no task metrics.", task.getTaskId()); - task.setFailed(true); - aggregateToStage(task); - aggregateToExecutor(task); - this.flushEntities(task, false); - } - - List needStoreStages = new ArrayList<>(); - for (SparkStage stage : this.stages.values()) { - Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); - if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) { - SparkJob job = this.jobs.get(jobId); - job.setNumSkippedStages(job.getNumSkippedStages() + 1); - job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks()); - } else { - this.aggregateToJob(stage); - this.aggregateStageToApp(stage); - needStoreStages.add(stage); - } - String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); - String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); - this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId)); - } - - this.flushEntities(needStoreStages, false); - for (SparkJob job : jobs.values()) { - this.aggregateJobToApp(job); - } - this.flushEntities(jobs.values(), false); - - app.setExecutors(executors.values().size()); - long executorMemory = parseExecutorMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName())); - long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName())); - int executoreCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName()); - int driverCore = this.isClientMode(app.getConfig()) ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName()); - long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead"); - long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead"); - - app.setExecMemoryBytes(executorMemory); - app.setDriveMemoryBytes(driverMemory); - app.setExecutorCores(executoreCore); - app.setDriverCores(driverCore); - app.setExecutorMemoryOverhead(executorMemoryOverhead); - app.setDriverMemoryOverhead(driverMemoryOverhead); - - for (SparkExecutor executor : executors.values()) { - String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString()); - if (executorID.equalsIgnoreCase("driver")) { - executor.setExecMemoryBytes(driverMemory); - executor.setCores(driverCore); - executor.setMemoryOverhead(driverMemoryOverhead); - } else { - executor.setExecMemoryBytes(executorMemory); - executor.setCores(executoreCore); - executor.setMemoryOverhead(executorMemoryOverhead); - } - if (executor.getEndTime() == 0) - executor.setEndTime(app.getEndTime()); - this.aggregateExecutorToApp(executor); - } - this.flushEntities(executors.values(), false); - //spark code...tricky - app.setSkippedTasks(app.getCompleteTasks()); - this.flushEntities(app, true); - } - - private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) { - long result = 0l; - if (config.getConfig().containsKey(fieldName)) { - result = this.parseExecutorMemory(config.getConfig().get(fieldName) + "m"); - if(result == 0l){ - result = this.parseExecutorMemory(config.getConfig().get(fieldName)); - } - } - - if(result == 0l){ - result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100); - } - return result; - } - - private void aggregateExecutorToApp(SparkExecutor executor) { - app.setTotalExecutorTime(app.getTotalExecutorTime() + (executor.getEndTime() - executor.getStartTime())); - } - - private void aggregateJobToApp(SparkJob job) { - //aggregate job level metrics - app.setNumJobs(app.getNumJobs() + 1); - app.setTotalTasks(app.getTotalTasks() + job.getNumTask()); - app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks()); - app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks()); - app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks()); - app.setTotalStages(app.getTotalStages() + job.getNumStages()); - app.setFailedStages(app.getFailedStages() + job.getNumFailedStages()); - app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages()); - } - - private void aggregateStageToApp(SparkStage stage) { - //aggregate task level metrics - app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled()); - app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled()); - app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime()); - app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime()); - app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime()); - app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime()); - app.setResultSize(app.getResultSize() + stage.getResultSize()); - app.setInputRecords(app.getInputRecords() + stage.getInputRecords()); - app.setInputBytes(app.getInputBytes() + stage.getInputBytes()); - app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords()); - app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes()); - app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords()); - app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes()); - app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords()); - app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes()); - } - - private void aggregateToStage(SparkTask task) { - String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); - String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); - String key = this.generateStageKey(stageId, stageAttemptId); - SparkStage stage = stages.get(key); - - stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled()); - stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled()); - stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime()); - stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime()); - stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime()); - stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime()); - stage.setResultSize(stage.getResultSize() + task.getResultSize()); - stage.setInputRecords(stage.getInputRecords() + task.getInputRecords()); - stage.setInputBytes(stage.getInputBytes() + task.getInputBytes()); - stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords()); - stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes()); - stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords()); - stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes()); - stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords()); - long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); - stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes); - - boolean success = !task.isFailed(); - - Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString())); - if (stageTaskStatusMap.get(key).containsKey(taskIndex)) { - //has previous task attempt, retrieved from task index in one stage - boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex); - success = previousResult || success; - if (previousResult != success) { - stage.setNumFailedTasks(stage.getNumFailedTasks() - 1); - stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); - stageTaskStatusMap.get(key).put(taskIndex, success); - } - } else { - if (success) { - stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); - } else { - stage.setNumFailedTasks(stage.getNumFailedTasks() + 1); - } - stageTaskStatusMap.get(key).put(taskIndex, success); - } - - } - - private void aggregateToExecutor(SparkTask task) { - String executorId = task.getExecutorId(); - SparkExecutor executor = executors.get(executorId); - - if (null != executor) { - executor.setTotalTasks(executor.getTotalTasks() + 1); - if (task.isFailed()) { - executor.setFailedTasks(executor.getFailedTasks() + 1); - } else { - executor.setCompletedTasks(executor.getCompletedTasks() + 1); - } - long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); - executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes); - executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); - executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes()); - executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes()); - executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); - } - - } - - private void aggregateToJob(SparkStage stage) { - Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); - SparkJob job = jobs.get(jobId); - job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks()); - job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks()); - job.setNumTask(job.getNumTask() + stage.getNumTasks()); - - - if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { - //if multiple attempts succeed, just count one - if (!hasStagePriorAttemptSuccess(stage)) { - job.setNumCompletedStages(job.getNumCompletedStages() + 1); - } - - } else { - job.setNumFailedStages(job.getNumFailedStages() + 1); - } - } - - private boolean hasStagePriorAttemptSuccess(SparkStage stage) { - Integer stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString())); - for (Integer i = 0; i < stageAttemptId; i++) { - SparkStage previousStage = stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), i.toString())); - if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { - return true; - } - } - return false; - } - - - private String generateStageKey(String stageId, String stageAttemptId) { - return String.format("%s-%s", stageId, stageAttemptId); - } - - private void initiateStage(Integer jobId, Integer stageId, Integer stageAttemptId, String name, int numTasks) { - SparkStage stage = new SparkStage(); - stage.setTags(new HashMap(this.app.getTags())); - stage.setTimestamp(app.getTimestamp()); - stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString()); - stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stageId.toString()); - stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stageAttemptId.toString()); - stage.setName(name); - stage.setNumActiveTasks(0); - stage.setNumTasks(numTasks); - stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ? "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool")); - - String stageKey = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); - stages.put(stageKey, stage); - this.jobStageMap.get(jobId).add(stageKey); - } - - - private SparkExecutor initiateExecutor(String executorID, long startTime) throws Exception { - if (!executors.containsKey(executorID)) { - SparkExecutor executor = new SparkExecutor(); - executor.setTags(new HashMap(this.app.getTags())); - executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID); - executor.setStartTime(startTime); - executor.setTimestamp(app.getTimestamp()); - - this.executors.put(executorID, executor); - } - - return this.executors.get(executorID); - } - - private String getNormalizedName(String jobName, String assignedName) { - if (null != assignedName) { - return assignedName; - } else { - return JobNameNormalization.getInstance().normalize(jobName); - } - } - - private long parseExecutorMemory(String memory) { - - if (memory.endsWith("g") || memory.endsWith("G")) { - int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * 1024 * executorGB; - } else if (memory.endsWith("m") || memory.endsWith("M")) { - int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * executorMB; - } else if (memory.endsWith("k") || memory.endsWith("K")) { - int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * executorKB; - } else if (memory.endsWith("t") || memory.endsWith("T")) { - int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * 1024 * 1024 * executorTB; - } else if (memory.endsWith("p") || memory.endsWith("P")) { - int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB; - } - Log.info("Cannot parse memory info " + memory); - return 0l; - } - - private void flushEntities(Object entity, boolean forceFlush) { - this.flushEntities(Arrays.asList(entity), forceFlush); - } - - private void flushEntities(Collection entities, boolean forceFlush) { - this.createEntities.addAll(entities); - - if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) { - try { - this.doFlush(this.createEntities); - this.createEntities.clear(); - } catch (Exception e) { - logger.error("Fail to flush entities", e); - } - - } - } - - private EagleServiceBaseClient initiateClient() { - String host = conf.getString("eagleProps.eagle.service.host"); - int port = conf.getInt("eagleProps.eagle.service.port"); - String userName = conf.getString("eagleProps.eagle.service.userName"); - String pwd = conf.getString("eagleProps.eagle.service.pwd"); - client = new EagleServiceClientImpl(host, port, userName, pwd); - int timeout = conf.getInt("eagleProps.eagle.service.read_timeout"); - client.getJerseyClient().setReadTimeout(timeout * 1000); - - return client; - } - - private void doFlush(List entities) throws Exception { - logger.info("start flushing entities of total number " + entities.size()); - client.create(entities); - logger.info("finish flushing entities of total number " + entities.size()); -// for(Object entity: entities){ -// if(entity instanceof SparkApp){ -// for (Field field : entity.getClass().getDeclaredFields()) { -// field.setAccessible(true); // You might want to set modifier to public first. -// Object value = field.get(entity); -// if (value != null) { -// System.out.println(field.getName() + "=" + value); -// } -// } -// } -// } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java deleted file mode 100644 index 75ce508..0000000 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.history.crawl; - - -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; - -public class JHFSparkParser implements JHFParserBase { - - private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class); - - JHFSparkEventReader eventReader; - - public JHFSparkParser(JHFSparkEventReader reader){ - this.eventReader = reader; - } - - @Override - public void parse(InputStream is) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(is)); - try{ - String line = null; - - JSONParser parser = new JSONParser(); - while((line = reader.readLine()) != null){ - try{ - JSONObject eventObj = (JSONObject) parser.parse(line); - this.eventReader.read(eventObj); - }catch(Exception e){ - logger.error(String.format("Fail to parse %s.", line), e); - } - } - this.eventReader.clearReader(); - - }finally { - if(reader != null){ - reader.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java deleted file mode 100644 index c206b71..0000000 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.jpm.spark.history.crawl; - -public class SparkApplicationInfo { - - private String state; - private String finalStatus; - private String queue; - private String name; - private String user; - - public String getState() { - return state; - } - - public void setState(String state) { - this.state = state; - } - - public String getFinalStatus() { - return finalStatus; - } - - public void setFinalStatus(String finalStatus) { - this.finalStatus = finalStatus; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java deleted file mode 100644 index 38c0a04..0000000 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.history.crawl; - -import org.apache.eagle.jpm.util.SparkJobTagName; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - -public class SparkHistoryFileInputStreamReaderImpl implements JHFInputStreamReader { - - private String site; - private SparkApplicationInfo app; - - - public SparkHistoryFileInputStreamReaderImpl(String site, SparkApplicationInfo app){ - this.site = site; - this.app = app; - } - - @Override - public void read(InputStream is) throws Exception { - Map baseTags = new HashMap<>(); - baseTags.put(SparkJobTagName.SITE.toString(), site); - baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue()); - JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app)); - parser.parse(is); - } - - public static void main(String[] args) throws Exception{ - SparkHistoryFileInputStreamReaderImpl impl = new SparkHistoryFileInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo()); - impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1"))); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java index 60e126e..0c475a9 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java @@ -26,7 +26,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.curator.retry.RetryNTimes; import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig; -import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo; +import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java index 9c231aa..8404eda 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java @@ -71,10 +71,13 @@ public class FinishedSparkJobSpout extends BaseRichSpout { public void nextTuple() { LOG.info("Start to run tuple"); try { - long fetchTime = Calendar.getInstance().getTimeInMillis(); + Calendar calendar = Calendar.getInstance(); + long fetchTime = calendar.getTimeInMillis(); + calendar.setTimeInMillis(this.lastFinishAppTime); + LOG.info("Last finished time = {}", calendar.getTime()); if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) { - List apps = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, new Long(lastFinishAppTime).toString()); - List appInfos = (null != apps ? (List)apps.get(0):new ArrayList()); + List appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime)); + //List appInfos = (null != apps ? (List)apps.get(0):new ArrayList()); LOG.info("Get " + appInfos.size() + " from yarn resource manager."); for (AppInfo app: appInfos) { String appId = app.getId(); @@ -123,7 +126,7 @@ public class FinishedSparkJobSpout extends BaseRichSpout { if (this.failTimes.containsKey(appId)) { failTimes = this.failTimes.get(appId); } - failTimes ++; + failTimes++; if (failTimes >= FAIL_MAX_TIMES) { this.failTimes.remove(appId); zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java index 4e6bf03..23d5152 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java @@ -24,18 +24,16 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; +import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader; +import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo; +import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl; import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig; -import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader; -import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo; -import org.apache.eagle.jpm.spark.history.crawl.SparkHistoryFileInputStreamReaderImpl; import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager; import org.apache.eagle.jpm.spark.history.status.ZKStateConstant; -import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.HDFSUtil; import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; import org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher; import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication; -import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplicationAttempt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,9 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; public class SparkJobParseBolt extends BaseRichBolt { @@ -87,15 +83,23 @@ public class SparkJobParseBolt extends BaseRichBolt { SparkApplicationInfo info = zkState.getApplicationInfo(appId); //first try to get attempts under the application - List attempts = this.getAttemptList(appId); + hdfs = HDFSUtil.getFileSystem(this.hdfsConf); + Set inprogressSet = new HashSet(); + List attemptLogNames = this.getAttemptLogNameList(appId, hdfs, inprogressSet); - if (attempts.isEmpty()) { - LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.", appId, info.getName(), info.getUser(), info.getQueue()); + if (attemptLogNames.isEmpty()) { + LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.", + appId, info.getName(), info.getUser(), info.getQueue()); } else { - hdfs = HDFSUtil.getFileSystem(this.hdfsConf); - for (SparkApplicationAttempt attempt : attempts) { - Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attempt.getAttemptId())); - JHFInputStreamReader reader = new SparkHistoryFileInputStreamReaderImpl(config.info.site , info); + for (String attemptLogName : attemptLogNames) { + String extension = ""; + if (inprogressSet.contains(attemptLogName)) { + extension = ".inprogress"; + } + LOG.info("Attempt log name: " + attemptLogName + extension); + + Path attemptFile = getFilePath(attemptLogName, extension); + JHFInputStreamReader reader = new SparkFilesystemInputStreamReaderImpl(config.info.site, info); reader.read(hdfs.open(attemptFile)); } } @@ -124,50 +128,67 @@ public class SparkJobParseBolt extends BaseRichBolt { } private String getAppAttemptLogName(String appId, String attemptId) { - return String.format("%s_%s", appId, attemptId); + if (attemptId.equals("0")) { + return appId; + } + return appId + "_" + attemptId; } - private List getAttemptList(String appId) throws IOException { - FileSystem hdfs = null; - List attempts = new ArrayList<>(); - try { + private Path getFilePath(String appAttemptLogName, String extension) { + String attemptLogDir = this.config.hdfsConfig.baseDir + "/" + appAttemptLogName + extension; + return new Path(attemptLogDir); + } - SparkApplication app = null; - /*try { - List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId); - if (apps != null) { - app = (SparkApplication) apps.get(0); - attempts = app.getAttempts(); + private List getAttemptLogNameList(String appId, FileSystem hdfs, Set inprogressSet) + throws IOException { + List attempts = new ArrayList(); + SparkApplication app = null; + /*try { + List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId); + if (apps != null) { + app = (SparkApplication) apps.get(0); + attempts = app.getAttempts(); + } + } catch (Exception e) { + LOG.warn("Fail to get application detail from history server for appId " + appId, e); + }*/ + + + if (null == app) { + // history server may not have the info, just double check. + // TODO: if attemptId is not "1, 2, 3,...", we should change the logic. + // Use getResourceManagerVersion() to compare YARN/RM versions. + // attemptId might be: "appId_000001" + int attemptId = 0; + + boolean exists = true; + while (exists) { + String attemptIdString = Integer.toString(attemptId); + String appAttemptLogName = this.getAppAttemptLogName(appId, attemptIdString); + LOG.info("Attempt ID: {}, App Attempt Log: {}", attemptIdString, appAttemptLogName); + + String extension = ""; + Path attemptFile = getFilePath(appAttemptLogName, extension); + extension = ".inprogress"; + Path inprogressFile = getFilePath(appAttemptLogName, extension); + Path logFile = null; + // Check if history log exists. + if (hdfs.exists(attemptFile)) { + logFile = attemptFile; + } else if (hdfs.exists(inprogressFile)) { + logFile = inprogressFile; + inprogressSet.add(appAttemptLogName); + } else if (attemptId > 0) { + exists = false; } - } catch (Exception e) { - LOG.warn("Fail to get application detail from history server for appId " + appId, e); - }*/ - - - if (null == app) { - //history server may not have the info, just double check - hdfs = HDFSUtil.getFileSystem(this.hdfsConf); - Integer attemptId = 1; - - boolean exists = true; - while (exists) { - Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attemptId.toString())); - if (hdfs.exists(attemptFile)) { - SparkApplicationAttempt attempt = new SparkApplicationAttempt(); - attempt.setAttemptId(attemptId.toString()); - attempts.add(attempt); - attemptId++; - } else { - exists = false; - } + + if (logFile != null) { + attempts.add(appAttemptLogName); } - } - return attempts; - } finally { - if (null != hdfs) { - hdfs.close(); + attemptId++; } } + return attempts; } @Override @@ -175,4 +196,4 @@ public class SparkJobParseBolt extends BaseRichBolt { super.cleanup(); zkState.close(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf index 36f0836..65aaa36 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf @@ -33,12 +33,12 @@ "zkSessionTimeoutMs" : 15000, "zkRetryTimes" : 3, "zkRetryInterval" : 20000, - spark.history.server.url : "http://sandbox.hortonworks.com:18080/", + spark.history.server.url : "http://sandbox.hortonworks.com:18080", spark.history.server.username : "", spark.history.server.pwd : "", rm.url:["http://sandbox.hortonworks.com:8088"] , "hdfs": { - "baseDir": "/logs/spark-events", + "baseDir": "/spark-history", "endPoint": "hdfs://sandbox.hortonworks.com:8020", "principal": "", "keytab" : "" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml index 7bf90d4..7f5e6e8 100644 --- a/eagle-jpm/eagle-jpm-spark-running/pom.xml +++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml @@ -29,38 +29,140 @@ eagle-jpm-spark-running http://maven.apache.org - - org.slf4j - slf4j-api - - - org.apache.eagle - eagle-stream-process-api - ${project.version} - + + org.slf4j + slf4j-api + + + org.apache.eagle + eagle-stream-process-api + ${project.version} + + + org.wso2.orbit.com.lmax + disruptor + + + asm + asm + + + + + org.apache.curator + curator-client + ${curator.version} + org.apache.eagle eagle-stream-process-base ${project.version} + + + org.wso2.orbit.com.lmax + disruptor + + + asm + asm + + + + + org.apache.eagle + eagle-job-common + ${project.version} + + + org.jsoup + jsoup + + + org.apache.storm + storm-core + + + ch.qos.logback + logback-classic + + + org.ow2.asm + asm + + + + + org.apache.eagle + eagle-jpm-util + ${project.version} + + + org.apache.eagle + eagle-jpm-entity + ${project.version} + + + org.apache.hadoop + hadoop-hdfs-nfs + ${hadoop.version} + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-annotations + ${hadoop.version} + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + org.apache.hadoop + hadoop-auth + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-app + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} - - org.apache.eagle - eagle-job-common - ${project.version} - - - org.jsoup - jsoup - - - org.apache.storm - storm-core - - - ch.qos.logback - logback-classic - - - + + + + src/main/resources + + + + + maven-assembly-plugin + + src/assembly/eagle-jpm-spark-running-assembly.xml + eagle-jpm-spark-running-${project.version} + + + + package + + single + + + posix + + + + + +