hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [19/40] hadoop git commit: HADOOP-14840. Tool to estimate resource requirements of an application pipeline based on prior executions. (Rui Li via Subru).
Date Fri, 27 Oct 2017 19:07:29 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/LpSolver.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/LpSolver.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/LpSolver.java
new file mode 100644
index 0000000..c944d20
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/LpSolver.java
@@ -0,0 +1,340 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.solver.impl;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
+import org.apache.hadoop.resourceestimator.skylinestore.api.PredictionSkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.resourceestimator.solver.api.Solver;
+import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException;
+import org.apache.hadoop.resourceestimator.solver.preprocess.SolverPreprocessor;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.ojalgo.optimisation.Expression;
+import org.ojalgo.optimisation.ExpressionsBasedModel;
+import org.ojalgo.optimisation.Optimisation.Result;
+import org.ojalgo.optimisation.Variable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A LP(Linear Programming) solution to predict recurring pipeline's
+ * {@link Resource} requirements, and generate Hadoop {@code RDL} requests which
+ * will be used to make recurring resource reservation.
+ */
+public class LpSolver extends BaseSolver implements Solver {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LpSolver.class);
+  private final SolverPreprocessor preprocessor = new SolverPreprocessor();
+  /**
+   * Controls the balance between over-allocation and under-allocation.
+   */
+  private double alpha;
+  /**
+   * Controls the generalization of the solver.
+   */
+  private double beta;
+  /**
+   * The minimum number of job runs required to run the solver.
+   */
+  private int minJobRuns;
+  /**
+   * The time interval which is used to discretize job execution.
+   */
+  private int timeInterval;
+  /**
+   * The PredictionSkylineStore to store the predicted ResourceSkyline for new
+   * run.
+   */
+  private PredictionSkylineStore predictionSkylineStore;
+
+  @Override public final void init(final Configuration config,
+      PredictionSkylineStore skylineStore) {
+    this.alpha =
+        config.getDouble(ResourceEstimatorConfiguration.SOLVER_ALPHA_KEY, 0.1);
+    this.beta =
+        config.getDouble(ResourceEstimatorConfiguration.SOLVER_BETA_KEY, 0.1);
+    this.minJobRuns =
+        config.getInt(ResourceEstimatorConfiguration.SOLVER_MIN_JOB_RUN_KEY, 1);
+    this.timeInterval =
+        config.getInt(ResourceEstimatorConfiguration.TIME_INTERVAL_KEY, 5);
+    this.predictionSkylineStore = skylineStore;
+  }
+
+  /**
+   * Generate over-allocation constraints.
+   *
+   * @param lpModel            the LP model.
+   * @param cJobITimeK         actual container allocation for job i in time
+   *                           interval k.
+   * @param oa                 container over-allocation.
+   * @param x                  predicted container allocation.
+   * @param indexJobITimeK     index for job i at time interval k.
+   * @param timeK              index for time interval k.
+   */
+  private void generateOverAllocationConstraints(
+      final ExpressionsBasedModel lpModel, final double cJobITimeK,
+      final Variable[] oa, final Variable[] x, final int indexJobITimeK,
+      final int timeK) {
+    // oa_job_i_timeK >= x_timeK - cJobITimeK
+    Expression overAllocExpression =
+        lpModel.addExpression("over_alloc_" + indexJobITimeK);
+    overAllocExpression.set(oa[indexJobITimeK], 1);
+    overAllocExpression.set(x[timeK], -1);
+    overAllocExpression.lower(-cJobITimeK); // >=
+  }
+
+  /**
+   * Generate under-allocation constraints.
+   *
+   * @param lpModel            the LP model.
+   * @param cJobITimeK     actual container allocation for job i in time
+   *                           interval k.
+   * @param uaPredict          absolute container under-allocation.
+   * @param ua                 recursive container under-allocation.
+   * @param x                  predicted container allocation.
+   * @param indexJobITimeK index for job i at time interval k.
+   * @param timeK             index for time interval k.
+   */
+  private void generateUnderAllocationConstraints(
+      final ExpressionsBasedModel lpModel, final double cJobITimeK,
+      final Variable[] uaPredict, final Variable[] ua, final Variable[] x,
+      final int indexJobITimeK, final int timeK) {
+    // uaPredict_job_i_timeK + x_timeK >= cJobITimeK
+    Expression underAllocPredictExpression =
+        lpModel.addExpression("under_alloc_predict_" + indexJobITimeK);
+    underAllocPredictExpression.set(uaPredict[indexJobITimeK], 1);
+    underAllocPredictExpression.set(x[timeK], 1);
+    underAllocPredictExpression.lower(cJobITimeK); // >=
+    if (timeK >= 1) {
+      /** Recursively calculate container under-allocation. */
+      // ua_job_i_timeK >= ua_job_i_time_(k-1) + cJobITimeK - x_timeK
+      Expression underAllocExpression =
+          lpModel.addExpression("under_alloc_" + indexJobITimeK);
+      underAllocExpression.set(ua[indexJobITimeK], 1);
+      underAllocExpression.set(ua[indexJobITimeK - 1], -1);
+      underAllocExpression.set(x[timeK], 1);
+      underAllocExpression.lower(cJobITimeK); // >=
+    } else {
+      /** Initial value for container under-allocation. */
+      // ua_job_i_time_0 >= cJobI_time_0 - x_time_0
+      Expression underAllocExpression =
+          lpModel.addExpression("under_alloc_" + indexJobITimeK);
+      underAllocExpression.set(ua[indexJobITimeK], 1);
+      underAllocExpression.set(x[timeK], 1);
+      underAllocExpression.lower(cJobITimeK); // >=
+    }
+  }
+
+  /**
+   * Generate solver objective.
+   *
+   * @param objective LP solver objective.
+   * @param numJobs   number of history runs of the recurring pipeline.
+   * @param jobLen    (maximum) job lenght of the recurring pipeline.
+   * @param oa        container over-allocation.
+   * @param ua        recursive container under-allocation.
+   * @param eps       regularization parameter.
+   */
+  private void generateObjective(final Expression objective, final int numJobs,
+      final int jobLen, final Variable[] oa, final Variable[] ua,
+      final Variable eps) {
+    int indexJobITimeK;
+    // sum Over_Allocation
+    for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
+      for (int timeK = 0; timeK < jobLen; timeK++) {
+        indexJobITimeK = indexJobI * jobLen + timeK;
+        objective.set(oa[indexJobITimeK], alpha / numJobs);
+      }
+    }
+    // sum Under_Allocation
+    int indexJobITimeN;
+    for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
+      indexJobITimeN = indexJobI * jobLen + jobLen - 1;
+      objective.set(ua[indexJobITimeN], (1 - alpha) / numJobs);
+    }
+    objective.set(eps, beta);
+    objective.weight(BigDecimal.valueOf(1));
+  }
+
+  /**
+   * Get the job length of recurring pipeline.
+   *
+   * @param resourceSkylines the history ResourceSkylines allocated to the
+   *                         recurring pipeline.
+   * @param numJobs          number of history runs of the recurring pipeline.
+   * @return length of (discretized time intervals of) the recurring pipeline.
+   */
+  private int getJobLen(final List<ResourceSkyline> resourceSkylines,
+      final int numJobs) {
+    int curLen = 0;
+    int jobLen = 0;
+    for (int indexJobI = 0; indexJobI < numJobs; indexJobI++) {
+      curLen = (int) (resourceSkylines.get(indexJobI).getSkylineList()
+          .getLatestNonNullTime() - resourceSkylines.get(indexJobI)
+          .getSkylineList().getEarliestStartTime() + timeInterval - 1)
+          / timeInterval; // for round up
+      if (jobLen < curLen) {
+        jobLen = curLen;
+      }
+    }
+    return jobLen;
+  }
+
+  @Override public final RLESparseResourceAllocation solve(
+      final Map<RecurrenceId, List<ResourceSkyline>> jobHistory)
+      throws SolverException, SkylineStoreException {
+    // TODO: addHistory timeout support for this function, and ideally we should
+    // return the confidence
+    // level associated with the predicted resource.
+    preprocessor.validate(jobHistory, timeInterval);
+    final List<ResourceSkyline> resourceSkylines =
+        preprocessor.aggregateSkylines(jobHistory, minJobRuns);
+    final int numJobs = resourceSkylines.size();
+    final int jobLen = getJobLen(resourceSkylines, numJobs);
+
+    /** Create variables. */
+    final ExpressionsBasedModel lpModel = new ExpressionsBasedModel();
+
+    Variable[] oa = new Variable[jobLen * numJobs];
+    Variable[] ua = new Variable[jobLen * numJobs];
+    Variable[] uaPredict = new Variable[jobLen * numJobs];
+    Variable[] x = new Variable[jobLen];
+    for (int i = 0; i < jobLen * numJobs; i++) {
+      oa[i] = new Variable("oa" + i).lower(BigDecimal.valueOf(0));
+      ua[i] = new Variable("ua" + i).lower(BigDecimal.valueOf(0));
+      uaPredict[i] = new Variable("uaPredict" + i).lower(BigDecimal.valueOf(0));
+    }
+    for (int i = 0; i < jobLen; i++) {
+      x[i] = new Variable("x").lower(BigDecimal.valueOf(0));
+    }
+    lpModel.addVariables(x);
+    lpModel.addVariables(oa);
+    lpModel.addVariables(ua);
+    lpModel.addVariables(uaPredict);
+    Variable eps = new Variable("epsilon").lower(BigDecimal.valueOf(0));
+    lpModel.addVariable(eps);
+
+    /** Set constraints. */
+    int indexJobITimeK = 0;
+    double cJobI = 0;
+    double cJobITimeK = 0;
+    ResourceSkyline resourceSkyline;
+    int[] containerNums;
+    // 1. sum(job_i){sum(timeK){1/cJobI * uaPredict_job_i_timeK}} <= numJobs
+    // * eps
+    Expression regularizationConstraint =
+        lpModel.addExpression("regularization");
+    regularizationConstraint.set(eps, -numJobs);
+    regularizationConstraint.upper(BigDecimal.valueOf(0)); // <= 0
+    for (int indexJobI = 0;
+         indexJobI < resourceSkylines.size(); indexJobI++) {
+      resourceSkyline = resourceSkylines.get(indexJobI);
+      // the # of containers consumed by job i in discretized time intervals
+      containerNums = preprocessor
+          .getDiscreteSkyline(resourceSkyline.getSkylineList(), timeInterval,
+              resourceSkyline.getContainerSpec().getMemorySize(), jobLen);
+      // the aggregated # of containers consumed by job i during its lifespan
+      cJobI = 0;
+      for (int i = 0; i < containerNums.length; i++) {
+        cJobI = cJobI + containerNums[i];
+      }
+      for (int timeK = 0; timeK < jobLen; timeK++) {
+        indexJobITimeK = indexJobI * jobLen + timeK;
+        // the # of containers consumed by job i in the k-th time interval
+        cJobITimeK = containerNums[timeK];
+        regularizationConstraint
+            .set(uaPredict[indexJobITimeK], 1 / cJobI);
+        generateOverAllocationConstraints(lpModel, cJobITimeK, oa, x,
+            indexJobITimeK, timeK);
+        generateUnderAllocationConstraints(lpModel, cJobITimeK, uaPredict,
+            ua, x, indexJobITimeK, timeK);
+      }
+    }
+
+    /** Set objective. */
+    Expression objective = lpModel.addExpression("objective");
+    generateObjective(objective, numJobs, jobLen, oa, ua, eps);
+
+    /** Solve the model. */
+    final Result lpResult = lpModel.minimise();
+    final TreeMap<Long, Resource> treeMap = new TreeMap<>();
+    RLESparseResourceAllocation result =
+        new RLESparseResourceAllocation(treeMap,
+            new DefaultResourceCalculator());
+    ReservationInterval riAdd;
+    Resource containerSpec = resourceSkylines.get(0).getContainerSpec();
+    String pipelineId =
+        ((RecurrenceId) jobHistory.keySet().toArray()[0]).getPipelineId();
+    Resource resource;
+    for (int indexTimeK = 0; indexTimeK < jobLen; indexTimeK++) {
+      riAdd = new ReservationInterval(indexTimeK * timeInterval,
+          (indexTimeK + 1) * timeInterval);
+      resource = Resource.newInstance(
+          containerSpec.getMemorySize() * (int) lpResult
+              .doubleValue(indexTimeK),
+          containerSpec.getVirtualCores() * (int) lpResult
+              .doubleValue(indexTimeK));
+      result.addInterval(riAdd, resource);
+      LOGGER.debug("time interval: {}, container: {}.", indexTimeK,
+          lpResult.doubleValue(indexTimeK));
+    }
+
+    predictionSkylineStore.addEstimation(pipelineId, result);
+
+    /**
+     * TODO: 1. We can calculate the estimated error (over-allocation,
+     * under-allocation) of our prediction which could be used to generate
+     * confidence level for our prediction; 2. Also, we can modify our model to
+     * take job input data size (and maybe stage info) into consideration; 3. We
+     * can also try to generate such conclusion: our prediction under-allocates
+     * X amount of resources from time 0 to time 100 compared with 95% of
+     * history runs; 4. We can build framework-specific versions of estimator
+     * (such as scope/spark/hive, etc.) and provides more specific suggestions.
+     * For example, we may say: for spark job i, its task size is X GB while the
+     * container memory allocation is Y GB; as a result, its shuffling stage is
+     * 20% slower than ideal case due to the disk spilling operations, etc. 5.
+     * If we have more information of jobs (other than ResourceSkyline), we may
+     * have such conclusion: job i is 20% slower than 90% of history runs, and
+     * it is because part of its tasks are running together with job j's tasks.
+     * In this case, we not only predict the amount of resource needed for job
+     * i, but also how to place the resource requirements to clusters; 6. We may
+     * monitor job progress, and dynamically increase/decrease container
+     * allocations to satisfy job deadline while minimizing the cost; 7. We may
+     * allow users to specify a budget (say $100 per job run), and optimize the
+     * resource allocation under the budget constraints. 8. ...
+     */
+    return result;
+  }
+
+  @Override public final void close() {
+    // TODO: currently place holder
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/package-info.java
new file mode 100644
index 0000000..3344f60
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for {@code Solver}.
+ */
+
+package org.apache.hadoop.resourceestimator.solver.impl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/SolverPreprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/SolverPreprocessor.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/SolverPreprocessor.java
new file mode 100644
index 0000000..af61334
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/SolverPreprocessor.java
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.solver.preprocess;
+
+import static java.lang.Math.toIntExact;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.solver.api.Solver;
+import org.apache.hadoop.resourceestimator.solver.exceptions.InvalidInputException;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common preprocessing functions for {@link Solver}.
+ */
+public class SolverPreprocessor {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SolverPreprocessor.class);
+
+  /**
+   * Check if Solver's input parameters are valid.
+   *
+   * @param jobHistory   the history {@link ResourceSkyline}s of the recurring
+   *                     pipeline job.
+   * @param timeInterval the time interval which is used to discretize the
+   *                     history {@link ResourceSkyline}s.
+   * @throws InvalidInputException if: (1) jobHistory is <em>null</em>;
+   *     (2) jobHistory is empty; (3) timeout is non-positive;
+   *     (4) timeInterval is non-positive;
+   */
+  public final void validate(
+      final Map<RecurrenceId, List<ResourceSkyline>> jobHistory,
+      final int timeInterval) throws InvalidInputException {
+    if ((jobHistory == null) || (jobHistory.size() == 0)) {
+      LOGGER.error(
+          "Job resource skyline history is invalid, please try again with"
+              + " valid resource skyline history.");
+      throw new InvalidInputException("Job ResourceSkyline history", "invalid");
+    }
+
+    if (timeInterval <= 0) {
+      LOGGER.error(
+          "Solver timeInterval {} is invalid, please specify a positive value.",
+          timeInterval);
+      throw new InvalidInputException("Solver timeInterval", "non-positive");
+    }
+  }
+
+  /**
+   * Return the multi-dimension resource vector consumed by the job at specified
+   * time.
+   *
+   * @param skyList           the list of {@link Resource}s used by the job.
+   * @param index             the discretized time index.
+   * @param containerMemAlloc the multi-dimension resource vector allocated to
+   *                          one container.
+   * @return the multi-dimension resource vector consumed by the job.
+   */
+  public final long getResourceVector(final RLESparseResourceAllocation skyList,
+      final int index, final long containerMemAlloc) {
+    return skyList.getCapacityAtTime(index).getMemorySize() / containerMemAlloc;
+  }
+
+  /**
+   * Discretize job's lifespan into intervals, and return the number of
+   * containers used by the job within each interval.
+   * <p> Note that here we assume all containers allocated to the job have the
+   * same {@link Resource}. This is due to the limit of
+   * {@link RLESparseResourceAllocation}.
+   *
+   * @param skyList           the list of {@link Resource}s used by the job.
+   * @param timeInterval      the time interval used to discretize the job's
+   *                          lifespan.
+   * @param containerMemAlloc the amount of memory allocated to each container.
+   * @param jobLen            the duration of the job.
+   * @return the number of containers allocated to the job within discretized
+   * time intervals.
+   */
+  public final int[] getDiscreteSkyline(
+      final RLESparseResourceAllocation skyList, final int timeInterval,
+      final long containerMemAlloc, final int jobLen) {
+    long jobLifeSpan =
+        skyList.getLatestNonNullTime() - skyList.getEarliestStartTime();
+    int[] result = new int[jobLen];
+    Arrays.fill(result, 0);
+
+    int index = 0;
+    long numContainerAt = 0;
+    for (int i = 0; i < jobLifeSpan; i++) {
+      index = (int) Math.floor((double) i / timeInterval);
+      numContainerAt = getResourceVector(skyList, i, containerMemAlloc);
+      if (result[index] < numContainerAt) {
+        result[index] = (int) numContainerAt;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Merge different jobs' resource skylines into one within the same pipeline.
+   *
+   * @param resourceSkylines different jobs' resource skylines within the same
+   *                         pipeline.
+   * @return an aggregated resource skyline for the pipeline.
+   */
+  public final ResourceSkyline mergeSkyline(
+      final List<ResourceSkyline> resourceSkylines) {
+    // TODO:
+    // rewrite this function with shift and merge once YARN-5328 is committed
+    /** First, getHistory the pipeline submission time. */
+    long pipelineSubmission = Long.MAX_VALUE;
+    for (int i = 0; i < resourceSkylines.size(); i++) {
+      long jobSubmission = resourceSkylines.get(i).getJobSubmissionTime();
+      if (pipelineSubmission > jobSubmission) {
+        pipelineSubmission = jobSubmission;
+      }
+    }
+    final TreeMap<Long, Resource> resourceOverTime = new TreeMap<>();
+    final RLESparseResourceAllocation skylineListAgg =
+        new RLESparseResourceAllocation(resourceOverTime,
+            new DefaultResourceCalculator());
+    /**
+     * Second, adjust different jobs' ResourceSkyline starting time based on
+     * pipeline submission time, and merge them into one ResourceSkyline.
+     */
+    for (int i = 0; i < resourceSkylines.size(); i++) {
+      long jobSubmission = resourceSkylines.get(i).getJobSubmissionTime();
+      long diff = (jobSubmission - pipelineSubmission) / 1000;
+      RLESparseResourceAllocation tmp =
+          resourceSkylines.get(i).getSkylineList();
+      Object[] timePoints = tmp.getCumulative().keySet().toArray();
+      for (int j = 0; j < timePoints.length - 2; j++) {
+        ReservationInterval riAdd =
+            new ReservationInterval(toIntExact((long) timePoints[j]) + diff,
+                toIntExact((long) timePoints[j + 1] + diff));
+        skylineListAgg.addInterval(riAdd,
+            tmp.getCapacityAtTime(toIntExact((long) timePoints[j])));
+      }
+    }
+    ResourceSkyline skylineAgg =
+        new ResourceSkyline(resourceSkylines.get(0).getJobId(),
+            resourceSkylines.get(0).getJobInputDataSize(),
+            resourceSkylines.get(0).getJobSubmissionTime(),
+            resourceSkylines.get(0).getJobFinishTime(),
+            resourceSkylines.get(0).getContainerSpec(), skylineListAgg);
+
+    return skylineAgg;
+  }
+
+  /**
+   * Aggregate all job's {@link ResourceSkyline}s in the one run of recurring
+   * pipeline, and return the aggregated {@link ResourceSkyline}s in different
+   * runs.
+   *
+   * @param jobHistory the history {@link ResourceSkyline} of the recurring
+   *                   pipeline job.
+   * @param minJobRuns the minimum number of job runs required to run the
+   *                   solver.
+   * @return the aggregated {@link ResourceSkyline}s in different runs.
+   * @throws InvalidInputException if: (1) job submission time parsing fails;
+   *     (2) jobHistory has less job runs than the minimum requirement;
+   */
+  public final List<ResourceSkyline> aggregateSkylines(
+      final Map<RecurrenceId, List<ResourceSkyline>> jobHistory,
+      final int minJobRuns) throws InvalidInputException {
+    List<ResourceSkyline> resourceSkylines = new ArrayList<ResourceSkyline>();
+    for (Map.Entry<RecurrenceId, List<ResourceSkyline>> entry : jobHistory
+        .entrySet()) {
+      // TODO: identify different jobs within the same pipeline
+      // right now, we do prediction at the granularity of pipeline, i.e., we
+      // will merge the
+      // resource skylines of jobs within the same pipeline into one aggregated
+      // resource skyline
+      ResourceSkyline skylineAgg = null;
+      skylineAgg = mergeSkyline(entry.getValue());
+      resourceSkylines.add(skylineAgg);
+    }
+    int numJobs = resourceSkylines.size();
+    if (numJobs < minJobRuns) {
+      LOGGER.error(
+          "Solver requires job resource skyline history for at least {} runs,"
+              + " but it only receives history info for {}  runs.",
+          minJobRuns, numJobs);
+      throw new InvalidInputException("Job ResourceSkyline history",
+          "containing less job runs" + " than " + minJobRuns);
+    }
+
+    return resourceSkylines;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/package-info.java
new file mode 100644
index 0000000..b58cef9
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/preprocess/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Preprocessor for {@code Solver}.
+ */
+
+package org.apache.hadoop.resourceestimator.solver.preprocess;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/JobMetaData.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/JobMetaData.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/JobMetaData.java
new file mode 100644
index 0000000..2bfab3e
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/JobMetaData.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.api;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job metadata collected when parsing the log file.
+ */
+public class JobMetaData {
+  // containerId, releaseTime
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(JobMetaData.class);
+  private final ResourceSkyline resourceSkyline = new ResourceSkyline();
+  private final Map<String, Long> rawStart = new HashMap<String, Long>();
+  // containerId, startTime
+  private final Map<String, Long> rawEnd = new HashMap<String, Long>();
+  private RecurrenceId recurrenceId;
+
+  /**
+   * Constructor.
+   *
+   * @param jobSubmissionTimeConfig job submission time.
+   */
+  public JobMetaData(final long jobSubmissionTimeConfig) {
+    resourceSkyline.setJobSubmissionTime(jobSubmissionTimeConfig);
+  }
+
+  /**
+   * Set job finish time.
+   *
+   * @param jobFinishTimeConfig job finish time.
+   * @return the reference to current {@link JobMetaData}.
+   */
+  public final JobMetaData setJobFinishTime(final long jobFinishTimeConfig) {
+    resourceSkyline.setJobFinishTime(jobFinishTimeConfig);
+    return this;
+  }
+
+  /**
+   * Add container launch time.
+   *
+   * @param containerId id of the container.
+   * @param time        container launch time.
+   * @return the reference to current {@link JobMetaData}.
+   */
+  public final JobMetaData setContainerStart(final String containerId,
+      final long time) {
+    if (rawStart.put(containerId, time) != null) {
+      LOGGER.warn("find duplicate container launch time for {}, so we replace"
+              + " it with {}.", containerId, time);
+    }
+    return this;
+  }
+
+  /**
+   * Add container release time.
+   *
+   * @param containerId id of the container.
+   * @param time        container release time.
+   * @return the reference to current {@link JobMetaData}.
+   */
+  public final JobMetaData setContainerEnd(final String containerId,
+      final long time) {
+    if (rawEnd.put(containerId, time) != null) {
+      LOGGER.warn("find duplicate container release time for {}, so we replace"
+          + " it with {}.", containerId, time);
+    }
+    return this;
+  }
+
+  /**
+   * Get {@link RecurrenceId}.
+   *
+   * @return {@link RecurrenceId}.
+   */
+  public final RecurrenceId getRecurrenceId() {
+    return recurrenceId;
+  }
+
+  /**
+   * Set {@link RecurrenceId}.
+   *
+   * @param recurrenceIdConfig the {@link RecurrenceId}.
+   * @return the reference to current {@link JobMetaData}.
+   */
+  public final JobMetaData setRecurrenceId(
+      final RecurrenceId recurrenceIdConfig) {
+    this.recurrenceId = recurrenceIdConfig;
+    return this;
+  }
+
+  /**
+   * Get {@link ResourceSkyline}.
+   *
+   * @return {@link ResourceSkyline}.
+   */
+  public final ResourceSkyline getResourceSkyline() {
+    return resourceSkyline;
+  }
+
+  /**
+   * Normalized container launch/release time, and generate the
+   * {@link ResourceSkyline}.
+   */
+  public final void createSkyline() {
+    final long jobSubmissionTime = resourceSkyline.getJobSubmissionTime();
+    Resource containerSpec = resourceSkyline.getContainerSpec();
+    final TreeMap<Long, Resource> resourceOverTime = new TreeMap<>();
+    final RLESparseResourceAllocation skylineList =
+        new RLESparseResourceAllocation(resourceOverTime,
+            new DefaultResourceCalculator());
+    resourceSkyline.setSkylineList(skylineList);
+    if (containerSpec == null) {
+      // if RmParser fails to extract container resource spec from logs, we will
+      // statically set
+      // it to be <1core, 1GB>
+      containerSpec = Resource.newInstance(1024, 1);
+    }
+    resourceSkyline.setContainerSpec(containerSpec);
+    for (final Map.Entry<String, Long> entry : rawStart.entrySet()) {
+      final long timeStart = entry.getValue();
+      final Long timeEnd = rawEnd.get(entry.getKey());
+      if (timeEnd == null) {
+        LOGGER.warn("container release time not found for {}.", entry.getKey());
+      } else {
+        final ReservationInterval riAdd =
+            new ReservationInterval((timeStart - jobSubmissionTime) / 1000,
+                (timeEnd - jobSubmissionTime) / 1000);
+        resourceSkyline.getSkylineList().addInterval(riAdd, containerSpec);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/LogParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/LogParser.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/LogParser.java
new file mode 100644
index 0000000..b7efe5a
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/LogParser.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.api;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.resourceestimator.common.exception.ResourceEstimatorException;
+import org.apache.hadoop.resourceestimator.skylinestore.api.HistorySkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+
+/**
+ * LogParser iterates over a stream of logs, uses {@link SingleLineParser} to
+ * parse each line, and adds extracted {@code ResourceSkyline}s to the
+ * {@code SkylineStore}.
+ */
+public interface LogParser extends AutoCloseable {
+
+  /**
+   * Initializing the LogParser, including loading solver parameters from
+   * configuration file.
+   *
+   * @param config       {@link Configuration} for the LogParser.
+   * @param skylineStore the {@link HistorySkylineStore} which stores recurring
+   *                     pipeline's {@code
+   *                     ResourceSkyline}s.
+   * @throws ResourceEstimatorException if initialization of a
+   *     {@code SingleLineParser} fails.
+   */
+  void init(Configuration config, HistorySkylineStore skylineStore)
+      throws ResourceEstimatorException;
+
+  /**
+   * Parses each line in the log stream, and adds extracted
+   * {@code ResourceSkyline}s to the {@code
+   * SkylineStore}.
+   *
+   * @param logs the stream of input logs.
+   * @throws SkylineStoreException if it fails to addHistory extracted
+   *     {@code ResourceSkyline}s to the {@code SkylineStore}.
+   * @throws IOException if it fails to read from the {@link InputStream}.
+   */
+  void parseStream(InputStream logs) throws SkylineStoreException, IOException;
+
+  @Override void close();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/SingleLineParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/SingleLineParser.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/SingleLineParser.java
new file mode 100644
index 0000000..b9607fe
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/SingleLineParser.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.api;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.translator.exceptions.DataFieldNotFoundException;
+
+/**
+ * SingleLineParser parses one line in the log file, extracts the
+ * {@link ResourceSkyline}s and stores them.
+ */
+public interface SingleLineParser {
+  /**
+   * Parse one line in the log file, extract the {@link ResourceSkyline}s and
+   * store them.
+   *
+   * @param logLine        one line in the log file.
+   * @param jobMetas       the job metadata collected during parsing.
+   * @param skylineRecords the valid {@link ResourceSkyline}s extracted from the
+   *                       log.
+   * @throws DataFieldNotFoundException if certain data fields are not found in
+   *                                    the log.
+   * @throws ParseException if it fails to convert date string to
+   *     unix timestamp successfully.
+   */
+  void parseLine(String logLine, Map<String, JobMetaData> jobMetas,
+      Map<RecurrenceId, List<ResourceSkyline>> skylineRecords)
+      throws DataFieldNotFoundException, ParseException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/package-info.java
new file mode 100644
index 0000000..a1b79c0
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/api/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * API for {@code Translator}.
+ */
+
+package org.apache.hadoop.resourceestimator.translator.api;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/DataFieldNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/DataFieldNotFoundException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/DataFieldNotFoundException.java
new file mode 100644
index 0000000..6eae614b
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/DataFieldNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.exceptions;
+
+/**
+ * Exception thrown when job attributes are not found.
+ */
+public class DataFieldNotFoundException extends Exception {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public DataFieldNotFoundException(final String log) {
+    super("Fail to extract data fields properly from " + log);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/package-info.java
new file mode 100644
index 0000000..104388d
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/exceptions/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Exception for {@code Translator}.
+ */
+
+package org.apache.hadoop.resourceestimator.translator.exceptions;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/BaseLogParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/BaseLogParser.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/BaseLogParser.java
new file mode 100644
index 0000000..50d911f
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/BaseLogParser.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.impl;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
+import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorUtil;
+import org.apache.hadoop.resourceestimator.common.exception.ResourceEstimatorException;
+import org.apache.hadoop.resourceestimator.skylinestore.api.HistorySkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.resourceestimator.translator.api.JobMetaData;
+import org.apache.hadoop.resourceestimator.translator.api.LogParser;
+import org.apache.hadoop.resourceestimator.translator.api.SingleLineParser;
+import org.apache.hadoop.resourceestimator.translator.exceptions.DataFieldNotFoundException;
+import org.apache.hadoop.resourceestimator.translator.validator.ParserValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class to implement {@link LogParser}. It wraps a
+ * {@link SingleLineParser} from the {@link Configuration} to parse a log
+ * dir/file.
+ */
+public class BaseLogParser implements LogParser {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(BaseLogParser.class);
+  private static final ParserValidator INPUT_VALIDATOR = new ParserValidator();
+  private SingleLineParser singleLineParser;
+  private HistorySkylineStore historySkylineStore;
+
+  @Override public void init(Configuration config,
+      HistorySkylineStore skylineStore) throws ResourceEstimatorException {
+    singleLineParser = ResourceEstimatorUtil.createProviderInstance(config,
+        ResourceEstimatorConfiguration.TRANSLATOR_LINE_PARSER,
+        ResourceEstimatorConfiguration.DEFAULT_TRANSLATOR_LINE_PARSER,
+        SingleLineParser.class);
+    this.historySkylineStore = skylineStore;
+  }
+
+  /**
+   * Add job's {@link ResourceSkyline}s to the {@link HistorySkylineStore}.
+   *
+   * @param skylineRecords the {@link Map} which records the completed recurring
+   *                       pipeline's {@link ResourceSkyline}s.
+   * @throws SkylineStoreException if it failes to addHistory job's
+   *     {@link ResourceSkyline}s to the {@link HistorySkylineStore}.
+   */
+  private void addToSkylineStore(
+      final Map<RecurrenceId, List<ResourceSkyline>> skylineRecords)
+      throws SkylineStoreException {
+    for (final Map.Entry<RecurrenceId, List<ResourceSkyline>> entry :
+        skylineRecords.entrySet()) {
+      historySkylineStore.addHistory(entry.getKey(), entry.getValue());
+    }
+  }
+
+  public void parseLine(final String logLine,
+      final Map<String, JobMetaData> jobMetas,
+      final Map<RecurrenceId, List<ResourceSkyline>> skylineRecords)
+      throws DataFieldNotFoundException, ParseException {
+    singleLineParser.parseLine(logLine, jobMetas, skylineRecords);
+  }
+
+  @Override public final void parseStream(final InputStream logs)
+      throws SkylineStoreException, IOException {
+    if (!INPUT_VALIDATOR.validate(logs)) {
+      LOGGER.error("Input validation fails, please specify with"
+          + " valid input parameters.");
+      return;
+    }
+    final Map<RecurrenceId, List<ResourceSkyline>> skylineRecords =
+        new HashMap<>();
+    final Map<String, JobMetaData> jobMetas =
+        new HashMap<String, JobMetaData>();
+    final BufferedReader bf = new BufferedReader(new InputStreamReader(logs));
+    String line = null;
+    while ((line = bf.readLine()) != null) {
+      try {
+        parseLine(line, jobMetas, skylineRecords);
+      } catch (DataFieldNotFoundException e) {
+        LOGGER.debug("Data field not found", e);
+      } catch (ParseException e) {
+        LOGGER.debug("Date conversion error", e);
+      }
+    }
+
+    addToSkylineStore(skylineRecords);
+  }
+
+  /**
+   * Release the resource used by the ParserUtil.
+   */
+  @Override public final void close() {
+    historySkylineStore = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/LogParserUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/LogParserUtil.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/LogParserUtil.java
new file mode 100644
index 0000000..35da799
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/LogParserUtil.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.impl;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.resourceestimator.common.exception.ResourceEstimatorException;
+import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.resourceestimator.translator.api.LogParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common utility functions for {@link LogParser}.
+ */
+public class LogParserUtil {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(LogParserUtil.class);
+  private LogParser logParser;
+  private DateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
+
+  /**
+   * Set the {@link LogParser} to use.
+   *
+   * @param logParser the {@link LogParser} to use.
+   */
+  public void setLogParser(final LogParser logParser) {
+    this.logParser = logParser;
+  }
+
+  /**
+   * Set date format for the {@link LogParser}.
+   *
+   * @param datePattern the date pattern in the log.
+   */
+  public void setDateFormat(final String datePattern) {
+    this.format = new SimpleDateFormat(datePattern);
+  }
+
+  /**
+   * Converts String date to unix timestamp. Note that we assume the time in the
+   * logs has the same time zone with the machine which runs the
+   * {@link RmSingleLineParser}.
+   *
+   * @param date The String date.
+   * @return Unix time stamp.
+   * @throws ParseException if data conversion from String to unix timestamp
+   *                        fails.
+   */
+  public long stringToUnixTimestamp(final String date) throws ParseException {
+    return format.parse(date).getTime();
+  }
+
+  /**
+   * Parse the log file/directory.
+   *
+   * @param logFile the file/directory of the log.
+   * @throws SkylineStoreException      if fails to addHistory to
+   *                                    {@link SkylineStore}.
+   * @throws IOException                if fails to parse the log.
+   * @throws ResourceEstimatorException if the {@link LogParser}
+   *     is not initialized.
+   */
+  public final void parseLog(final String logFile)
+      throws SkylineStoreException, IOException, ResourceEstimatorException {
+    if (logParser == null) {
+      throw new ResourceEstimatorException("The log parser is not initialized,"
+          + " please try again after initializing.");
+    }
+    InputStream inputStream = new FileInputStream(logFile);
+    logParser.parseStream(inputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/NativeSingleLineParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/NativeSingleLineParser.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/NativeSingleLineParser.java
new file mode 100644
index 0000000..83557d3
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/NativeSingleLineParser.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.impl;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
+import org.apache.hadoop.resourceestimator.translator.api.JobMetaData;
+import org.apache.hadoop.resourceestimator.translator.api.SingleLineParser;
+import org.apache.hadoop.resourceestimator.translator.exceptions.DataFieldNotFoundException;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This sample parser will parse the sample log and extract the resource
+ * skyline.
+ * <p> The expected log format is: NormalizedJobName NumInstances SubmitTime
+ * StartTime EndTime JobInstanceName memUsage coreUsage
+ */
+public class NativeSingleLineParser implements SingleLineParser {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(NativeSingleLineParser.class);
+
+  /**
+   * Aggregates different jobs' {@link ResourceSkyline}s within the same
+   * pipeline together.
+   *
+   * @param resourceSkyline newly extracted {@link ResourceSkyline}.
+   * @param recurrenceId    the {@link RecurrenceId} which the resourceSkyline
+   *                        belongs to.
+   * @param skylineRecords  a {@link Map} which stores the
+   *     {@link ResourceSkyline}s for all pipelines during this parsing.
+   */
+  private void aggregateSkyline(final ResourceSkyline resourceSkyline,
+      final RecurrenceId recurrenceId,
+      final Map<RecurrenceId, List<ResourceSkyline>> skylineRecords) {
+    List<ResourceSkyline> resourceSkylines = skylineRecords.get(recurrenceId);
+    if (resourceSkylines == null) {
+      resourceSkylines = new ArrayList<ResourceSkyline>();
+      skylineRecords.put(recurrenceId, resourceSkylines);
+    }
+    resourceSkylines.add(resourceSkyline);
+  }
+
+  @Override public void parseLine(String logLine,
+      Map<String, JobMetaData> jobMetas,
+      Map<RecurrenceId, List<ResourceSkyline>> skylineRecords)
+      throws DataFieldNotFoundException, ParseException {
+    Configuration config = new Configuration();
+    config.addResource(new org.apache.hadoop.fs.Path(
+        ResourceEstimatorConfiguration.CONFIG_FILE));
+    int timeInterval =
+        config.getInt(ResourceEstimatorConfiguration.TIME_INTERVAL_KEY, 5);
+    // note that for native log, we assume each container is allocated <1 core,
+    // 1GB RAM>
+    long containerMemAlloc = 1024;
+    int containerCpuAlloc = 1;
+    String[] splitString = logLine.split("\\s+");
+    String pipelineId = splitString[0];
+    String jobId = splitString[5];
+    String[] skylineUnits = splitString[7].split("\\|");
+
+    JobMetaData appMeta = new JobMetaData(0);
+    RecurrenceId recurrenceId = new RecurrenceId(pipelineId, jobId);
+    appMeta.setRecurrenceId(recurrenceId);
+    Resource containerAlloc;
+    int numContainers;
+    ResourceSkyline resourceSkyline = appMeta.getResourceSkyline();
+    final TreeMap<Long, Resource> resourceOverTime = new TreeMap<>();
+    final RLESparseResourceAllocation skylineList =
+        new RLESparseResourceAllocation(resourceOverTime,
+            new DefaultResourceCalculator());
+    resourceSkyline.setSkylineList(skylineList);
+    for (String elem : skylineUnits) {
+      numContainers = Integer.parseInt(elem.split("\\:")[0]);
+      containerAlloc = Resource.newInstance(containerMemAlloc * numContainers,
+          containerCpuAlloc * numContainers);
+      final ReservationInterval riAdd =
+          new ReservationInterval(Long.parseLong(elem.split("\\:")[1]),
+              Long.parseLong(elem.split("\\:")[1]) + timeInterval);
+      resourceSkyline.getSkylineList().addInterval(riAdd, containerAlloc);
+    }
+    resourceSkyline.setContainerSpec(
+        Resource.newInstance(containerMemAlloc, containerCpuAlloc));
+    appMeta.setJobFinishTime(
+        appMeta.getResourceSkyline().getSkylineList().getLatestNonNullTime());
+    resourceSkyline.setJobInputDataSize(0);
+    resourceSkyline.setJobId(jobId);
+    aggregateSkyline(resourceSkyline, recurrenceId, skylineRecords);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/RmSingleLineParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/RmSingleLineParser.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/RmSingleLineParser.java
new file mode 100644
index 0000000..1904934
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/RmSingleLineParser.java
@@ -0,0 +1,203 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.impl;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.translator.api.JobMetaData;
+import org.apache.hadoop.resourceestimator.translator.api.SingleLineParser;
+import org.apache.hadoop.resourceestimator.translator.exceptions.DataFieldNotFoundException;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * {@link SingleLineParser} for Hadoop Resource Manager logs.
+ */
+public class RmSingleLineParser implements SingleLineParser {
+  private static final LogParserUtil PARSERUTIL = new LogParserUtil();
+  private static final Pattern FILTER_PATTERN = Pattern.compile(
+      "(Submit Application Request|AM Allocated Container|"
+          + "AM Released Container|finalState=FAILED|"
+          + "ApplicationSummary|, Resource:)");
+  private static final Pattern SUBMISSION_PATTERN =
+      Pattern.compile("APPID=(\\w+)");
+  private static final Pattern FAIL_PATTERN =
+      Pattern.compile("appattempt_(\\d+_\\d+)_\\d+");
+  private static final Pattern FINISH_PATTERN =
+      Pattern.compile("appId=(\\w+).*?name=(\\w+)\\-(\\w+)");
+  private static final Pattern CONTAINER_EVENT_PATTERN =
+      Pattern.compile("APPID=(\\w+).*?CONTAINERID=(\\w+)");
+  private static final Pattern CONTAINER_SPEC_PATTERN = Pattern.compile(
+      "(container_[^_]+|appattempt)_(\\d+_\\d+).*?memory:(\\d+),"
+          + "\\svCores:(\\d+)");
+
+  /**
+   * Aggregates different jobs' {@link ResourceSkyline}s within the same
+   * pipeline together.
+   *
+   * @param resourceSkyline newly extracted {@link ResourceSkyline}.
+   * @param recurrenceId    the {@link RecurrenceId} which the resourceSkyline
+   *                        belongs to.
+   * @param skylineRecords  a {@link Map} which stores the
+   *     {@link ResourceSkyline}s for all pipelines during this parsing.
+   */
+  private void aggregateSkyline(final ResourceSkyline resourceSkyline,
+      final RecurrenceId recurrenceId,
+      final Map<RecurrenceId, List<ResourceSkyline>> skylineRecords) {
+    List<ResourceSkyline> resourceSkylines = skylineRecords.get(recurrenceId);
+    if (resourceSkylines == null) {
+      resourceSkylines = new ArrayList<ResourceSkyline>();
+      skylineRecords.put(recurrenceId, resourceSkylines);
+    }
+    resourceSkylines.add(resourceSkyline);
+  }
+
+  @Override public final void parseLine(final String logLine,
+      final Map<String, JobMetaData> jobMetas,
+      final Map<RecurrenceId, List<ResourceSkyline>> skylineRecords)
+      throws DataFieldNotFoundException, ParseException {
+    final String[] splits = logLine.split(",", 5); // Limit the max number of 5
+    // splits
+    if (splits.length < 5) {
+      return;
+    }
+    final Matcher jobEventMatcher = FILTER_PATTERN.matcher(splits[4]); // search
+    // only
+    // the
+    // tail
+    if (!jobEventMatcher.find()) { // this line of log does not contain targeted
+      // events
+      return;
+    }
+
+    // now we have the match, let's do some parsing
+    final long date = PARSERUTIL.stringToUnixTimestamp(splits[1]);
+    final String tail = splits[4].split("\\s+", 4)[3]; // use the tail of the
+    // tail only
+    switch (jobEventMatcher.group(1)) {
+    case "Submit Application Request": {
+      /** Submit job. */
+      final Matcher appIdMatch = SUBMISSION_PATTERN.matcher(tail);
+      if (appIdMatch.find()) {
+        final String appId = appIdMatch.group(1);
+        jobMetas.put(appId, new JobMetaData(date));
+      } else {
+        throw new DataFieldNotFoundException(tail);
+      }
+      break;
+    }
+    case "AM Allocated Container": {
+      /** Allocate container. */
+      final Matcher containerEventMatcher =
+          CONTAINER_EVENT_PATTERN.matcher(tail);
+      if (containerEventMatcher.find()) {
+        final String appId = containerEventMatcher.group(1);
+        final String containerId = containerEventMatcher.group(2);
+        final JobMetaData appMeta = jobMetas.get(appId);
+        if (appMeta != null) {
+          appMeta.setContainerStart(containerId, date);
+        }
+      } else {
+        throw new DataFieldNotFoundException(tail);
+      }
+      break;
+    }
+    case ", Resource:": {
+      final Matcher containerSpecMatcher = CONTAINER_SPEC_PATTERN.matcher(tail);
+      if (containerSpecMatcher.find()) {
+        final String appId = "application_" + containerSpecMatcher.group(2);
+        final JobMetaData appMeta = jobMetas.get(appId);
+        if (appMeta != null) {
+          final long memAlloc = Long.parseLong(containerSpecMatcher.group(3));
+          final int cpuAlloc = Integer.parseInt(containerSpecMatcher.group(4));
+          final Resource containerAlloc =
+              Resource.newInstance(memAlloc, cpuAlloc);
+          appMeta.getResourceSkyline().setContainerSpec(containerAlloc);
+        }
+      } else {
+        throw new DataFieldNotFoundException(tail);
+      }
+      break;
+    }
+    case "AM Released Container": {
+      final Matcher containerEventMatcher =
+          CONTAINER_EVENT_PATTERN.matcher(tail);
+      if (containerEventMatcher.find()) {
+        final String appId = containerEventMatcher.group(1);
+        final JobMetaData appMeta = jobMetas.get(appId);
+        if (appMeta != null) {
+          final String containerId = containerEventMatcher.group(2);
+          appMeta.setContainerEnd(containerId, date);
+        }
+      } else {
+        throw new DataFieldNotFoundException(tail);
+      }
+      break;
+    }
+    case "finalState=FAILED": {
+      /** In case of appAttempt failed: discard previous records. */
+      final Matcher failMatcher = FAIL_PATTERN.matcher(tail);
+      if (failMatcher.find()) {
+        final String appId = "application_" + failMatcher.group(1);
+        if (jobMetas.containsKey(appId)) {
+          jobMetas.put(appId, new JobMetaData(date));
+        }
+      } else {
+        throw new DataFieldNotFoundException(tail);
+      }
+      break;
+    }
+    case "ApplicationSummary": {
+      /** Finish a job. */
+      final Matcher finishMatcher = FINISH_PATTERN.matcher(tail);
+      if (finishMatcher.find()) {
+        final String appId = finishMatcher.group(1);
+        final String pipelineId = finishMatcher.group(2);
+        final String runId = finishMatcher.group(3);
+        final RecurrenceId recurrenceId = new RecurrenceId(pipelineId, runId);
+        final JobMetaData appMeta = jobMetas.remove(appId);
+        if (appMeta != null) {
+          appMeta.setRecurrenceId(recurrenceId).setJobFinishTime(date)
+              .getResourceSkyline().setJobInputDataSize(0); // TODO: need to
+          // read job input
+          // data size from
+          // logs
+          appMeta.createSkyline();
+          final ResourceSkyline resourceSkyline = appMeta.getResourceSkyline();
+          resourceSkyline.setJobId(appId);
+          aggregateSkyline(resourceSkyline, recurrenceId, skylineRecords);
+        }
+      } else {
+        throw new DataFieldNotFoundException(tail);
+      }
+      break;
+    }
+    default:
+      break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/package-info.java
new file mode 100644
index 0000000..13b5138
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for {@code Translator}.
+ */
+
+package org.apache.hadoop.resourceestimator.translator.impl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/ParserValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/ParserValidator.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/ParserValidator.java
new file mode 100644
index 0000000..ebf50ba
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/ParserValidator.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.translator.validator;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.resourceestimator.translator.api.LogParser;
+
+/**
+ * Validates the input parameters for the {@link LogParser}.
+ */
+public class ParserValidator {
+  /**
+   * Validates the input parameters for the {@link LogParser}.
+   *
+   * @param logs input log streams to the {@link LogParser}.
+   * @return whether the input parameters are valid or not.
+   */
+  public final boolean validate(final InputStream logs) {
+    // TODO
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/package-info.java
new file mode 100644
index 0000000..c48b262
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/translator/validator/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Validator for {@code Translator}.
+ */
+
+package org.apache.hadoop.resourceestimator.translator.validator;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/625039ef/hadoop-tools/hadoop-resourceestimator/src/main/resources/webapps/ResourceEstimatorServer/.gitignore
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/resources/webapps/ResourceEstimatorServer/.gitignore b/hadoop-tools/hadoop-resourceestimator/src/main/resources/webapps/ResourceEstimatorServer/.gitignore
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/resources/webapps/ResourceEstimatorServer/.gitignore
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message