Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D5FEB200B35 for ; Tue, 5 Jul 2016 20:07:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D4719160A70; Tue, 5 Jul 2016 18:07:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 867B1160A2C for ; Tue, 5 Jul 2016 20:07:52 +0200 (CEST) Received: (qmail 22221 invoked by uid 500); 5 Jul 2016 18:07:51 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 22208 invoked by uid 99); 5 Jul 2016 18:07:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 18:07:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 79227C0155 for ; Tue, 5 Jul 2016 18:07:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id eitIr3saInK6 for ; Tue, 5 Jul 2016 18:07:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id C7E815FD1C for ; Tue, 5 Jul 2016 18:07:41 +0000 (UTC) Received: (qmail 22012 invoked by uid 99); 5 Jul 2016 18:07:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 18:07:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0BB98E9E9C; Tue, 5 Jul 2016 18:07:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Tue, 05 Jul 2016 18:07:45 -0000 Message-Id: <9414723ca2a74e0d85868bad0eaf2dd5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring archived-at: Tue, 05 Jul 2016 18:07:55 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java new file mode 100755 index 0000000..2684899 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa") +@ColumnFamily("f") +@Prefix("jexec") +@Service(JPAConstants.JPA_JOB_EXECUTION_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +@Indexes({ + @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true), + @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false) + }) +public class JobExecutionAPIEntity extends JobBaseAPIEntity { + @Column("a") + private String currentState; + @Column("b") + private long startTime; + @Column("c") + private long endTime; + @Column("d") + private int numTotalMaps; + @Column("e") + private int numFailedMaps; + @Column("f") + private int numFinishedMaps; + @Column("g") + private int numTotalReduces; + @Column("h") + private int numFailedReduces; + @Column("i") + private int numFinishedReduces; + @Column("j") + private JobCounters jobCounters; + + public String getCurrentState() { + return currentState; + } + public void setCurrentState(String currentState) { + this.currentState = currentState; + _pcs.firePropertyChange("currentState", null, null); + } + public long getStartTime() { + return startTime; + } + public void setStartTime(long startTime) { + this.startTime = startTime; + _pcs.firePropertyChange("startTime", null, null); + } + public long getEndTime() { + return endTime; + } + public void setEndTime(long endTime) { + this.endTime = endTime; + _pcs.firePropertyChange("endTime", null, null); + } + public int getNumTotalMaps() { + return numTotalMaps; + } + public void setNumTotalMaps(int numTotalMaps) { + this.numTotalMaps = numTotalMaps; + _pcs.firePropertyChange("numTotalMaps", null, null); + } + public int getNumFailedMaps() { + return numFailedMaps; + } + public void setNumFailedMaps(int numFailedMaps) { + this.numFailedMaps = numFailedMaps; + _pcs.firePropertyChange("numFailedMaps", null, null); + } + public int getNumFinishedMaps() { + return numFinishedMaps; + } + public void setNumFinishedMaps(int numFinishedMaps) { + this.numFinishedMaps = numFinishedMaps; + _pcs.firePropertyChange("numFinishedMaps", null, null); + } + public int getNumTotalReduces() { + return numTotalReduces; + } + public void setNumTotalReduces(int numTotalReduces) { + this.numTotalReduces = numTotalReduces; + _pcs.firePropertyChange("numTotalReduces", null, null); + } + public int getNumFailedReduces() { + return numFailedReduces; + } + public void setNumFailedReduces(int numFailedReduces) { + this.numFailedReduces = numFailedReduces; + _pcs.firePropertyChange("numFailedReduces", null, null); + } + public int getNumFinishedReduces() { + return numFinishedReduces; + } + + public void setNumFinishedReduces(int numFinishedReduces) { + this.numFinishedReduces = numFinishedReduces; + _pcs.firePropertyChange("numFinishedReduces", null, null); + } + + public JobCounters getJobCounters() { + return jobCounters; + } + + public void setJobCounters(JobCounters jobCounters) { + this.jobCounters = jobCounters; + _pcs.firePropertyChange("jobCounters", null, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java new file mode 100644 index 0000000..2400c55 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa_process") +@ColumnFamily("f") +@Prefix("process") +@Service(JPAConstants.JPA_JOB_PROCESS_TIME_STAMP_NAME) +@TimeSeries(true) +@Partition({"site"}) +public class JobProcessTimeStampEntity extends TaggedLogAPIEntity { + @Column("a") + private long currentTimeStamp; + + public long getCurrentTimeStamp() { + return currentTimeStamp; + } + public void setCurrentTimeStamp(long currentTimeStamp) { + this.currentTimeStamp = currentTimeStamp; + _pcs.firePropertyChange("currentTimeStamp", null, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java new file mode 100644 index 0000000..9769620 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa_anomaly") +@ColumnFamily("f") +@Prefix("tacount") +@Service(JPAConstants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +public class TaskAttemptCounterAPIEntity extends JobBaseAPIEntity { + @Column("a") + private int totalCount; + @Column("b") + private int failedCount; + @Column("c") + private int killedCount; + + public int getKilledCount() { + return killedCount; + } + public void setKilledCount(int killedCount) { + this.killedCount = killedCount; + _pcs.firePropertyChange("killedCount", null, null); + } + public int getFailedCount() { + return failedCount; + } + public void setFailedCount(int failedCount) { + this.failedCount = failedCount; + _pcs.firePropertyChange("failedCount", null, null); + } + public int getTotalCount() { + return totalCount; + } + public void setTotalCount(int totalCount) { + this.totalCount = totalCount; + _pcs.firePropertyChange("totalCount", null, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java new file mode 100755 index 0000000..77994a5 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa_task") +@ColumnFamily("f") +@Prefix("taexec") +@Service(JPAConstants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +@Indexes({ + @Index(name="Index_1_jobId", columns = { "jobID" }, unique = false) + }) +public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { + @Column("a") + private String taskStatus; + @Column("b") + private long startTime; + @Column("c") + private long endTime; + @Column("d") + private long duration; + @Column("e") + private String error; + @Column("f") + private JobCounters jobCounters; + @Column("g") + private String taskAttemptID; + + public String getTaskStatus() { + return taskStatus; + } + public void setTaskStatus(String taskStatus) { + this.taskStatus = taskStatus; + _pcs.firePropertyChange("taskStatus", null, null); + } + public long getStartTime() { + return startTime; + } + public void setStartTime(long startTime) { + this.startTime = startTime; + _pcs.firePropertyChange("startTime", null, null); + } + public long getEndTime() { + return endTime; + } + public void setEndTime(long endTime) { + this.endTime = endTime; + _pcs.firePropertyChange("endTime", null, null); + } + public long getDuration() { + return duration; + } + public void setDuration(long duration) { + this.duration = duration; + _pcs.firePropertyChange("duration", null, null); + } + public String getError() { + return error; + } + public void setError(String error) { + this.error = error; + _pcs.firePropertyChange("error", null, null); + } + public JobCounters getJobCounters() { + return jobCounters; + } + public void setJobCounters(JobCounters jobCounters) { + this.jobCounters = jobCounters; + _pcs.firePropertyChange("jobCounters", null, null); + } + public String getTaskAttemptID() { + return taskAttemptID; + } + public void setTaskAttemptID(String taskAttemptID) { + this.taskAttemptID = taskAttemptID; + _pcs.firePropertyChange("taskAttemptID", null, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java new file mode 100644 index 0000000..f287688 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa_task") +@ColumnFamily("f") +@Prefix("texec") +@Service(JPAConstants.JPA_TASK_EXECUTION_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +public class TaskExecutionAPIEntity extends JobBaseAPIEntity { + @Column("a") + private String taskStatus; + @Column("b") + private long startTime; + @Column("c") + private long endTime; + @Column("d") + private long duration; + @Column("e") + private String error; + @Column("f") + private JobCounters jobCounters; + + public String getTaskStatus() { + return taskStatus; + } + public void setTaskStatus(String taskStatus) { + this.taskStatus = taskStatus; + _pcs.firePropertyChange("taskStatus", null, null); + } + public long getStartTime() { + return startTime; + } + public void setStartTime(long startTime) { + this.startTime = startTime; + _pcs.firePropertyChange("startTime", null, null); + } + public long getEndTime() { + return endTime; + } + public void setEndTime(long endTime) { + this.endTime = endTime; + _pcs.firePropertyChange("endTime", null, null); + } + public long getDuration() { + return duration; + } + public void setDuration(long duration) { + this.duration = duration; + _pcs.firePropertyChange("duration", null, null); + } + public String getError() { + return error; + } + public void setError(String error) { + this.error = error; + _pcs.firePropertyChange("error", null, null); + } + public JobCounters getJobCounters() { + return jobCounters; + } + public void setJobCounters(JobCounters jobCounters) { + this.jobCounters = jobCounters; + _pcs.firePropertyChange("jobCounters", null, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java new file mode 100755 index 0000000..5ae67c0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa_anomaly") +@ColumnFamily("f") +@Prefix("taskfailurecount") +@Service(JPAConstants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +public class TaskFailureCountAPIEntity extends JobBaseAPIEntity { + @Column("a") + private int failureCount; + @Column("b") + private String error; + @Column("c") + private String taskStatus; + + + public String getTaskStatus() { + return taskStatus; + } + + public void setTaskStatus(String taskStatus) { + this.taskStatus = taskStatus; + _pcs.firePropertyChange("taskStatus", null, null); + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + _pcs.firePropertyChange("error", null, null); + } + + public int getFailureCount() { + return failureCount; + } + + public void setFailureCount(int failureCount) { + this.failureCount = failureCount; + _pcs.firePropertyChange("failureCount", null, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java new file mode 100644 index 0000000..1c1c759 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.jobcounter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure + * counters. + * + */ +public final class CounterGroupDictionary { + + private final List groupKeys = new ArrayList<>(); + + private static volatile CounterGroupDictionary instance = null; + private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class); + + private CounterGroupDictionary() {} + + public static CounterGroupDictionary getInstance() throws JobCounterException { + if (instance == null) { + synchronized (CounterGroupDictionary.class) { + if (instance == null) { + CounterGroupDictionary tmp = new CounterGroupDictionary(); + tmp.initialize(); + instance = tmp; + } + } + } + return instance; + } + + public CounterGroupKey getCounterGroupByName(String groupName) { + for (CounterGroupKey groupKey : groupKeys) { + if (groupKey.getName().equalsIgnoreCase(groupName)) { + return groupKey; + } + } + return null; + } + + public CounterGroupKey getCounterGroupByIndex(int groupIndex) { + if (groupIndex < 0 || groupIndex >= groupKeys.size()) { + return null; + } + return groupKeys.get(groupIndex); + } + + private void initialize() throws JobCounterException { + // load config.properties file from classpath + InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf"); + try { + if (is == null) { + is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf"); + if (is == null) { + final String errMsg = "Failed to load JobCounter.conf"; + LOG.error(errMsg); + throw new JobCounterException(errMsg); + } + } + final Properties prop = new Properties(); + try { + prop.load(is); + } catch(Exception ex) { + final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage(); + LOG.error(errMsg, ex); + throw new JobCounterException(errMsg, ex); + } + int groupIndex = 0; + while (parseGroup(groupIndex, prop)) { + ++groupIndex; + } + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + } + } + } + } + + private boolean parseGroup(int groupIndex, Properties prop) { + final String groupKeyBase = "counter.group" + groupIndex; + final String groupNameKey = groupKeyBase + ".name"; + final String groupName = prop.getProperty(groupNameKey); + + if (groupName == null) { + return false; + } + + final String groupDescriptionKey = groupKeyBase + ".description"; + final String groupDescription = prop.getProperty(groupDescriptionKey); + final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription); + final ArrayList counters = new ArrayList(); + + int counterIndex = 0; + while (parseCounter(groupKey, counterIndex, counters, prop)) { + ++counterIndex; + } + groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()])); + groupKeys.add(groupKey); + return true; + } + + private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List counters, Properties prop) { + final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex; + final String counterNameKey = counterKeyBase + ".names"; + final String counterNamesString = prop.getProperty(counterNameKey); + + if (counterNamesString == null) { + return false; + } + final String[] names = counterNamesString.split(","); + final List counterNames = new ArrayList(); + for (String name : names) { + counterNames.add(name.trim()); + } + + final String counterDescriptionKey = counterKeyBase + ".description"; + final String counterDescription = prop.getProperty(counterDescriptionKey); + + CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey); + counters.add(counter); + return true; + } + + private static class CounterKeyImpl implements CounterKey { + private final int index; + private final List counterNames; + private final String description; + private final CounterGroupKey groupKey; + + public CounterKeyImpl(int index, List counterNames, String description, CounterGroupKey groupKey) { + this.index = index; + this.counterNames = counterNames; + this.description = description; + this.groupKey = groupKey; + } + @Override + public int getIndex() { + return index; + } + @Override + public List getNames() { + return counterNames; + } + @Override + public String getDescription() { + return description; + } + @Override + public CounterGroupKey getGroupKey() { + return groupKey; + } + } + + private static class CounterGroupKeyImpl implements CounterGroupKey { + private final int index; + private final String name; + private final String description; + private CounterKey[] counterKeys; + + public CounterGroupKeyImpl(int index, String name, String description) { + this.index = index; + this.name = name; + this.description = description; + } + + public void setCounterKeys(CounterKey[] counterKeys) { + this.counterKeys = counterKeys; + } + + @Override + public int getIndex() { + return index; + } + @Override + public String getName() { + return name; + } + @Override + public String getDescription() { + return description; + } + @Override + public int getCounterNumber() { + return counterKeys.length; + } + @Override + public List listCounterKeys() { + return Arrays.asList(counterKeys); + } + @Override + public CounterKey getCounterKeyByName(String name) { + for (CounterKey counterKey : counterKeys) { + for (String n : counterKey.getNames()) { + if (n.equalsIgnoreCase(name)) { + return counterKey; + } + } + } + return null; + } + @Override + public CounterKey getCounterKeyByID(int index) { + if (index < 0 || index >= counterKeys.length) { + return null; + } + return counterKeys[index]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java new file mode 100644 index 0000000..82606d1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.jobcounter; + +import java.util.List; + +public interface CounterGroupKey { + + String getName(); + String getDescription(); + int getIndex(); + int getCounterNumber(); + List listCounterKeys(); + CounterKey getCounterKeyByName(String name); + CounterKey getCounterKeyByID(int index); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java new file mode 100644 index 0000000..161490f --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.jobcounter; + +import java.util.List; + +public interface CounterKey { + + List getNames(); + String getDescription(); + int getIndex(); + CounterGroupKey getGroupKey(); + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java new file mode 100644 index 0000000..5ffaf51 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.jobcounter; + +public class JobCounterException extends Exception { + + /** + * + */ + private static final long serialVersionUID = -4525162176188266862L; + + /** + * Default constructor of JobCounterException + */ + public JobCounterException() { + super(); + } + + /** + * Constructor of JobCounterException + * + * @param message error message + */ + public JobCounterException(String message) { + super(message); + } + + /** + * Constructor of JobCounterException + * + * @param message error message + * @param cause the cause of the exception + * + */ + public JobCounterException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructor of JobCounterException + * + * @param cause the cause of the exception + */ + public JobCounterException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java new file mode 100644 index 0000000..2806cf1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.jobcounter; + +import java.util.Map; +import java.util.TreeMap; + + +public final class JobCounters { + + private Map> counters = new TreeMap<>(); + + public Map> getCounters() { + return counters; + } + + public void setCounters(Map> counters) { + this.counters = counters; + } + + public String toString(){ + return counters.toString(); + } + + public void clear() { + for (Map.Entry> entry : counters.entrySet()) { + entry.getValue().clear(); + } + counters.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java new file mode 100644 index 0000000..0a137be --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +public enum EagleJobStatus { + SUBMITTED, + LAUNCHED, + PREP, + RUNNING, + SUCCESS, + FAILED; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java new file mode 100644 index 0000000..9d13fb4 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +public enum EagleJobTagName { + SITE("site"), + RACK("rack"), + HOSTNAME("hostname"), + JOB_NAME("jobName"), + NORM_JOB_NAME("normJobName"), + JOB_ID("jobID"), + TASK_ID("taskID"), + TASK_ATTEMPT_ID("taskAttemptID"), + JOB_STATUS("jobStatus"), + USER("user"), + TASK_TYPE("taskType"), + TASK_EXEC_TYPE("taskExecType"), + ERROR_CATEGORY("errorCategory"), + JOB_QUEUE("queue"), + RULE_TYPE("ruleType"), + JOB_TYPE("jobType"); + + private String tagName; + private EagleJobTagName(String tagName) { + this.tagName = tagName; + } + + public String toString() { + + return this.tagName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java new file mode 100644 index 0000000..fef62a2 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +public enum EagleTaskStatus { + SUCCESS, + KILLED, + FAILED, +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java new file mode 100644 index 0000000..559f7a8 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + + +import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; + +/** + * generalizing this listener would decouple entity creation and entity handling, also will help unit testing + * @author yonzhang + * + */ +public interface HistoryJobEntityCreationListener { + /** + * job entity created event + * @param entity + */ + void jobEntityCreated(JobBaseAPIEntity entity) throws Exception; + /** + * for streaming processing, flush would help commit the last several entities + */ + void flush() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java new file mode 100644 index 0000000..fc678f8 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; + +public interface HistoryJobEntityLifecycleListener extends HistoryJobEntityCreationListener { + /** + * job entity created event + * @param entity + */ + void jobEntityCreated(JobBaseAPIEntity entity) throws Exception; + + /** + * Job finished + */ + void jobFinish(); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java new file mode 100644 index 0000000..d454c31 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +/** + */ +public class ImportException extends RuntimeException { + private static final long serialVersionUID = -706778307046285820L; + + public ImportException(String message) { + super(message); + } + + public ImportException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java new file mode 100644 index 0000000..31bfdb5 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.mr.history.common.JobConfig; +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.entities.*; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A new instance of JHFEventReaderBase will be created for each job history log file. + */ +public abstract class JHFEventReaderBase extends JobEntityCreationPublisher implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(JHFEventReaderBase.class); + protected Map m_baseTags; + protected JobEventAPIEntity m_jobSubmitEventEntity; + protected JobEventAPIEntity m_jobLaunchEventEntity; + protected int m_numTotalMaps; + protected int m_numTotalReduces; + protected JobEventAPIEntity m_jobFinishEventEntity; + protected JobExecutionAPIEntity m_jobExecutionEntity; + protected Map m_taskStartTime; + // taskAttemptID to task attempt startTime + protected Map m_taskAttemptStartTime; + + // taskID to host mapping, for task it's the host where the last attempt runs on + protected Map m_taskRunningHosts; + // hostname to rack mapping + protected Map m_host2RackMapping; + + protected String m_jobID; + protected String m_jobName; + protected String m_jobType; + protected String m_normJobName; + protected String m_user; + protected String m_queueName; + protected Long m_jobLauchTime; + protected JobHistoryContentFilter m_filter; + + protected final List jobEntityLifecycleListeners = new ArrayList<>(); + + protected final Configuration configuration; + + public JPAConstants.JobType fetchJobType(Configuration config) { + if (config.get(JPAConstants.JobConfiguration.CASCADING_JOB) != null) { return JPAConstants.JobType.CASCADING; } + if (config.get(JPAConstants.JobConfiguration.HIVE_JOB) != null) { return JPAConstants.JobType.HIVE; } + if (config.get(JPAConstants.JobConfiguration.PIG_JOB) != null) { return JPAConstants.JobType.PIG; } + if (config.get(JPAConstants.JobConfiguration.SCOOBI_JOB) != null) {return JPAConstants.JobType.SCOOBI; } + return JPAConstants.JobType.NOTAVALIABLE; + } + + /** + * baseTags stores the basic tag name values which might be used for persisting various entities + * baseTags includes: cluster, datacenter and jobName + * baseTags are used for all job/task related entities + * @param baseTags + */ + public JHFEventReaderBase(Map baseTags, Configuration configuration, JobHistoryContentFilter filter) { + this.m_filter = filter; + + this.m_baseTags = baseTags; + m_jobSubmitEventEntity = new JobEventAPIEntity(); + m_jobSubmitEventEntity.setTags(new HashMap<>(baseTags)); + + m_jobLaunchEventEntity = new JobEventAPIEntity(); + m_jobLaunchEventEntity.setTags(new HashMap<>(baseTags)); + + m_jobFinishEventEntity = new JobEventAPIEntity(); + m_jobFinishEventEntity.setTags(new HashMap<>(baseTags)); + + m_jobExecutionEntity = new JobExecutionAPIEntity(); + m_jobExecutionEntity.setTags(new HashMap<>(baseTags)); + + m_taskRunningHosts = new HashMap<>(); + + m_host2RackMapping = new HashMap<>(); + + m_taskStartTime = new HashMap<>(); + m_taskAttemptStartTime = new HashMap<>(); + + this.configuration = configuration; + + if (this.configuration != null && this.m_jobType == null) { + this.setJobType(fetchJobType(this.configuration).toString()); + } + } + + public void register(HistoryJobEntityLifecycleListener lifecycleListener){ + this.jobEntityLifecycleListeners.add(lifecycleListener); + } + + @Override + public void close() throws IOException { + // check if this job history file is complete + if (m_jobExecutionEntity.getEndTime() == 0L) { + throw new IOException(new JHFWriteNotCompletedException(m_jobID)); + } + try { + flush(); + } catch(Exception ex) { + throw new IOException(ex); + } + } + + @Override + public void flush() throws Exception { + super.flush(); + for (HistoryJobEntityLifecycleListener listener : this.jobEntityLifecycleListeners) { + listener.flush(); + } + } + + /** + * @param id + */ + private void setJobID(String id) { + this.m_jobID = id; + } + + private void setJobType(String jobType) { + this.m_jobType = jobType; + } + + protected void handleJob(EventType eventType, Map values, Object totalCounters) throws Exception { + String id = values.get(Keys.JOBID); + + if (m_jobID == null) { + setJobID(id); + } else if (!m_jobID.equals(id)) { + String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobID + "'"; + LOG.error(msg); + throw new ImportException(msg); + } + + if (values.get(Keys.SUBMIT_TIME) != null) { // job submitted + m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME))); + m_user = values.get(Keys.USER); + m_queueName = values.get(Keys.JOB_QUEUE); + m_jobName = values.get(Keys.JOBNAME); + m_normJobName = m_jobName; + + LOG.info("NormJobName of " + id + ": " + m_normJobName); + + m_jobSubmitEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); + m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); + m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name()); + m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); + m_jobSubmitEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); + m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + entityCreated(m_jobSubmitEventEntity); + } else if(values.get(Keys.LAUNCH_TIME) != null) { // job launched + m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME))); + m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp(); + m_jobLaunchEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); + m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); + m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name()); + m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); + m_jobLaunchEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); + m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS)); + m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES)); + entityCreated(m_jobLaunchEventEntity); + } else if(values.get(Keys.FINISH_TIME) != null) { // job finished + m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME))); + m_jobFinishEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); + m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); + m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS)); + m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); + m_jobFinishEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); + m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + entityCreated(m_jobFinishEventEntity); + + // populate jobExecutionEntity entity + m_jobExecutionEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); + m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); + m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); + m_jobExecutionEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); + m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_QUEUE.toString(), m_queueName); + m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + + m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS)); + m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp()); + m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp()); + m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); + if (values.get(Keys.FAILED_MAPS) != null) { + // for Artemis + m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS))); + } + if (values.get(Keys.FAILED_REDUCES) != null) { + // for Artemis + m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES))); + } + m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS))); + m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES))); + m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps); + m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces); + if (values.get(Keys.COUNTERS) != null || totalCounters != null) { + m_jobExecutionEntity.setJobCounters(parseCounters(totalCounters)); + } + entityCreated(m_jobExecutionEntity); + } + } + + private void entityCreated(JobBaseAPIEntity entity) throws Exception { + for (HistoryJobEntityLifecycleListener lifecycleListener: this.jobEntityLifecycleListeners) { + lifecycleListener.jobEntityCreated(entity); + } + + // job finished when passing JobExecutionAPIEntity: m_jobExecutionEntity + if (entity == this.m_jobExecutionEntity) { + for (HistoryJobEntityLifecycleListener lifecycleListener : this.jobEntityLifecycleListeners) { + lifecycleListener.jobFinish(); + } + } + + super.notifiyListeners(entity); + } + + protected abstract JobCounters parseCounters(Object value) throws IOException; + + /** + * for one task ID, it has several sequential task events, i.e. + * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end + * @param values + * @throws IOException + */ + @SuppressWarnings("serial") + protected void handleTask(RecordTypes recType, EventType eventType, final Map values, Object counters) throws Exception { + String taskAttemptID = values.get(Keys.TASK_ATTEMPT_ID); + String startTime = values.get(Keys.START_TIME); + String finishTime = values.get(Keys.FINISH_TIME); + final String taskType = values.get(Keys.TASK_TYPE); + final String taskID = values.get(Keys.TASKID); + + Map taskBaseTags = new HashMap(){{ + put(EagleJobTagName.TASK_TYPE.toString(), taskType); + put(EagleJobTagName.USER.toString(), m_user); + //put(EagleJobTagName.JOB_NAME.toString(), _jobName); + put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); + put(EagleJobTagName.JOB_TYPE.toString(), m_jobType); + put(EagleJobTagName.JOB_ID.toString(), m_jobID); + put(EagleJobTagName.TASK_ID.toString(), taskID); + }}; + taskBaseTags.putAll(m_baseTags); + if (recType == RecordTypes.Task && startTime != null) { // task start, no host is assigned yet + m_taskStartTime.put(taskID, Long.valueOf(startTime)); + } else if (recType == RecordTypes.Task && finishTime != null) { // task finish + // task execution entity setup + TaskExecutionAPIEntity entity = new TaskExecutionAPIEntity(); + Map taskExecutionTags = new HashMap<>(taskBaseTags); + String hostname = m_taskRunningHosts.get(taskID); + hostname = (hostname == null) ? "" : hostname; // TODO if task fails, then no hostname + taskExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname); + taskExecutionTags.put(EagleJobTagName.RACK.toString(), m_host2RackMapping.get(hostname)); + entity.setTags(taskExecutionTags); + entity.setStartTime(m_taskStartTime.get(taskID)); + entity.setEndTime(Long.valueOf(finishTime)); + entity.setDuration(entity.getEndTime() - entity.getStartTime()); + entity.setTimestamp(m_jobLauchTime); + entity.setError(values.get(Keys.ERROR)); + entity.setTaskStatus(values.get(Keys.TASK_STATUS)); + if (values.get(Keys.COUNTERS) != null || counters != null) { + entity.setJobCounters(parseCounters(counters)); + } + entityCreated(entity); + //_taskStartTime.remove(taskID); // clean this taskID + } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start + m_taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime)); + } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && finishTime != null) { // task attempt finish + TaskAttemptExecutionAPIEntity entity = new TaskAttemptExecutionAPIEntity(); + Map taskAttemptExecutionTags = new HashMap<>(taskBaseTags); + entity.setTags(taskAttemptExecutionTags); + String hostname = values.get(Keys.HOSTNAME); + String rack = values.get(Keys.RACK); + taskAttemptExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname); + taskAttemptExecutionTags.put(EagleJobTagName.RACK.toString(), rack); + // put last attempt's hostname to task level + m_taskRunningHosts.put(taskID, hostname); + // it is very likely that an attempt ID could be both succeeded and failed due to M/R system + // in this case, we should ignore this attempt? + if (m_taskAttemptStartTime.get(taskAttemptID) == null) { + LOG.warn("task attemp has consistency issue " + taskAttemptID); + return; + } + entity.setStartTime(m_taskAttemptStartTime.get(taskAttemptID)); + entity.setEndTime(Long.valueOf(finishTime)); + entity.setTimestamp(m_jobLauchTime); + entity.setDuration(entity.getEndTime() - entity.getStartTime()); + entity.setTaskStatus(values.get(Keys.TASK_STATUS)); + entity.setError(values.get(Keys.ERROR)); + if (values.get(Keys.COUNTERS) != null || counters != null) { // when task is killed, COUNTERS does not exist + //entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS))); + entity.setJobCounters(parseCounters(counters)); + } + entity.setTaskAttemptID(taskAttemptID); + entityCreated(entity); + m_taskAttemptStartTime.remove(taskAttemptID); + } else { + // silently ignore + LOG.warn("It's an exceptional case ?"); + } + } + + public void parseConfiguration() throws Exception { + Map prop = new TreeMap<>(); + + if (m_filter.acceptJobConfFile()) { + Iterator > iter = configuration.iterator(); + while (iter.hasNext()) { + String key = iter.next().getKey(); + if (included(key) && !excluded(key)) + prop.put(key, configuration.get(key)); + } + } + + // check must-have keys are within prop + if (matchMustHaveKeyPatterns(prop)) { + JobConfigurationAPIEntity jobConfigurationEntity = new JobConfigurationAPIEntity(); + jobConfigurationEntity.setTags(new HashMap<>(m_baseTags)); + jobConfigurationEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); + jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); + jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); + jobConfigurationEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); + jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(), m_jobType); + jobConfigurationEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); + + JobConfig jobConfig = new JobConfig(); + jobConfig.setConfig(prop); + jobConfigurationEntity.setJobConfig(jobConfig); + jobConfigurationEntity.setConfigJobName(m_normJobName); + entityCreated(jobConfigurationEntity); + } + } + + private boolean matchMustHaveKeyPatterns(Map prop) { + if (m_filter.getMustHaveJobConfKeyPatterns() == null) { + return true; + } + + for (Pattern p : m_filter.getMustHaveJobConfKeyPatterns()) { + boolean matched = false; + for (String key : prop.keySet()) { + if (p.matcher(key).matches()) { + matched = true; + break; + } + } + if (!matched) + return false; + } + return true; + } + + private boolean included(String key) { + if (m_filter.getJobConfKeyInclusionPatterns() == null) + return true; + for (Pattern p : m_filter.getJobConfKeyInclusionPatterns()) { + Matcher m = p.matcher(key); + if (m.matches()) { + LOG.info("include key: " + p.toString()); + return true; + } + } + return false; + } + + private boolean excluded(String key) { + if (m_filter.getJobConfKeyExclusionPatterns() == null) + return false; + for (Pattern p : m_filter.getJobConfKeyExclusionPatterns()) { + Matcher m = p.matcher(key); + if (m.matches()) + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java new file mode 100644 index 0000000..adeb41e --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +public enum JHFFormat { + MRVer1, + MRVer2 +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java new file mode 100644 index 0000000..24e3563 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.util.CountersStrings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Listener holds all informations related to one whole job history file, so it's stateful and does not support multithreading. + * @author yonzhang + * + */ +public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer1PerLineListener { + private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1EventReader.class); + + /** + * baseTags stores the basic tag name values which might be used for persisting various entities + * baseTags includes: cluster, datacenter and jobName + * baseTags are used for all job/task related entities + * @param baseTags + */ + public JHFMRVer1EventReader(Map baseTags, Configuration configuration, JobHistoryContentFilter filter) { + super(baseTags, configuration, filter); + } + + @Override + public void handle(RecordTypes recType, Map values) + throws Exception { + switch (recType) { + case Job: + handleJob(null, values, values.get(Keys.COUNTERS)); + break; + case Task: + handleTask(RecordTypes.Task, null, values, values.get(Keys.COUNTERS)); + break; + case MapAttempt: + ensureRackHostnameAfterAttemptFinish(values); + handleTask(RecordTypes.MapAttempt, null, values, values.get(Keys.COUNTERS)); + break; + case ReduceAttempt: + ensureRackHostnameAfterAttemptFinish(values); + handleTask(RecordTypes.ReduceAttempt, null, values, values.get(Keys.COUNTERS)); + break; + default: + // skip other types + ; + } + } + + private void ensureRackHostnameAfterAttemptFinish(Map values) { + // only care about attempt finish + if (values.get(Keys.FINISH_TIME) == null) + return; + String hostname = null; + String rack = null; + // we get rack/hostname based on task's status + if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.SUCCESS.name())) { + // in CDH4.1.4, the hostname has the format of /default/rack128/phxdpehdc10dn0338.stratus.phx.ebay.com + // if not specified, the hostname has the format of /default-rack/ + String decoratedHostname = values.get(Keys.HOSTNAME); + String[] tmp = decoratedHostname.split("/"); + hostname = tmp[tmp.length - 1]; + rack = tmp[tmp.length - 2]; + m_host2RackMapping.put(hostname, rack); + } else if(values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) { + hostname = values.get(Keys.HOSTNAME); + // make every effort to get RACK information + hostname = (hostname == null) ? "" : hostname; + rack = m_host2RackMapping.get(hostname); + } + + values.put(Keys.HOSTNAME, hostname); + values.put(Keys.RACK, rack); + } + + @Override + protected JobCounters parseCounters(Object value) throws IOException { + JobCounters jc = new JobCounters(); + Map> groups = new HashMap>(); + Counters counters = new Counters(); + try { + CountersStrings.parseEscapedCompactString((String)value, counters); + } catch (Exception ex) { + logger.error("can not parse job history", ex); + throw new IOException(ex); + } + Iterator it = counters.iterator(); + while (it.hasNext()) { + CounterGroup cg = it.next(); + + // hardcoded to exclude business level counters + if (!cg.getName().equals("org.apache.hadoop.mapreduce.FileSystemCounter") + && !cg.getName().equals("org.apache.hadoop.mapreduce.TaskCounter") + && !cg.getName().equals("org.apache.hadoop.mapreduce.JobCounter") + && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter") + && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter") + && !cg.getName().equals("FileSystemCounters") // for artemis + && !cg.getName().equals("org.apache.hadoop.mapred.Task$Counter") // for artemis + && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis + && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter") + ) continue; + + groups.put(cg.getName(), new HashMap()); + Map counterValues = groups.get(cg.getName()); + logger.debug("groupname:" + cg.getName() + "(" + cg.getDisplayName() + ")"); + Iterator iterCounter = cg.iterator(); + while (iterCounter.hasNext()) { + Counter c = iterCounter.next(); + counterValues.put(c.getName(), c.getValue()); + logger.debug(c.getName() + "=" + c.getValue() + "(" + c.getDisplayName() + ")"); + } + } + jc.setCounters(groups); + return jc; + } + + public JobExecutionAPIEntity jobExecution() { + return m_jobExecutionEntity; + } +}