hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [4/5] hadoop git commit: HADOOP-14840. Tool to estimate resource requirements of an application pipeline based on prior executions. (Rui Li via Subru).
Date Wed, 25 Oct 2017 23:01:45 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java
new file mode 100644
index 0000000..933332e
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java
@@ -0,0 +1,238 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
+import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorUtil;
+import org.apache.hadoop.resourceestimator.common.exception.ResourceEstimatorException;
+import org.apache.hadoop.resourceestimator.common.serialization.RLESparseResourceAllocationSerDe;
+import org.apache.hadoop.resourceestimator.common.serialization.ResourceSerDe;
+import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.resourceestimator.solver.api.Solver;
+import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException;
+import org.apache.hadoop.resourceestimator.translator.api.LogParser;
+import org.apache.hadoop.resourceestimator.translator.impl.LogParserUtil;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import com.google.inject.Singleton;
+
+/**
+ * Resource Estimator Service which provides a set of REST APIs for users to
+ * use the estimation service.
+ */
+@Singleton @Path("/resourceestimator") public class ResourceEstimatorService {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ResourceEstimatorService.class);
+  private static SkylineStore skylineStore;
+  private static Solver solver;
+  private static LogParser logParser;
+  private static LogParserUtil logParserUtil = new LogParserUtil();
+  private static Configuration config;
+  private static Gson gson;
+  private static Type rleType;
+  private static Type skylineStoreType;
+
+  public ResourceEstimatorService() throws ResourceEstimatorException {
+    if (skylineStore == null) {
+      try {
+        config = new Configuration();
+        config.addResource(ResourceEstimatorConfiguration.CONFIG_FILE);
+        skylineStore = ResourceEstimatorUtil.createProviderInstance(config,
+            ResourceEstimatorConfiguration.SKYLINESTORE_PROVIDER,
+            ResourceEstimatorConfiguration.DEFAULT_SKYLINESTORE_PROVIDER,
+            SkylineStore.class);
+        logParser = ResourceEstimatorUtil.createProviderInstance(config,
+            ResourceEstimatorConfiguration.TRANSLATOR_PROVIDER,
+            ResourceEstimatorConfiguration.DEFAULT_TRANSLATOR_PROVIDER,
+            LogParser.class);
+        logParser.init(config, skylineStore);
+        logParserUtil.setLogParser(logParser);
+        solver = ResourceEstimatorUtil.createProviderInstance(config,
+            ResourceEstimatorConfiguration.SOLVER_PROVIDER,
+            ResourceEstimatorConfiguration.DEFAULT_SOLVER_PROVIDER,
+            Solver.class);
+        solver.init(config, skylineStore);
+      } catch (Exception ex) {
+        LOGGER
+            .error("Server initialization failed due to: {}", ex.getMessage());
+        throw new ResourceEstimatorException(ex.getMessage(), ex);
+      }
+      gson = new GsonBuilder()
+          .registerTypeAdapter(Resource.class, new ResourceSerDe())
+          .registerTypeAdapter(RLESparseResourceAllocation.class,
+              new RLESparseResourceAllocationSerDe())
+          .enableComplexMapKeySerialization().create();
+      rleType = new TypeToken<RLESparseResourceAllocation>() {
+      }.getType();
+      skylineStoreType =
+          new TypeToken<Map<RecurrenceId, List<ResourceSkyline>>>() {
+          }.getType();
+    }
+  }
+
+  /**
+   * Parse the log file. See also {@link LogParser#parseStream(InputStream)}.
+   *
+   * @param logFile file/directory of the log to be parsed.
+   * @throws IOException                if fails to parse the log.
+   * @throws SkylineStoreException      if fails to addHistory to
+   *                                    {@link SkylineStore}.
+   * @throws ResourceEstimatorException if the {@link LogParser}
+   *     is not initialized.
+   */
+  @POST @Path("/translator/{logFile : .+}") public void parseFile(
+      @PathParam("logFile") String logFile)
+      throws IOException, SkylineStoreException, ResourceEstimatorException {
+    logParserUtil.parseLog(logFile);
+    LOGGER.debug("Parse logFile: {}.", logFile);
+  }
+
+  /**
+   * Get predicted {code Resource} allocation for the pipeline. If the
+   * prediction for the pipeline already exists in the {@link SkylineStore}, it
+   * will directly get the prediction from {@link SkylineStore}, otherwise it
+   * will call the {@link Solver} to make prediction, and store the predicted
+   * {code Resource} allocation to the {@link SkylineStore}. Note that invoking
+   * {@link Solver} could be a time-consuming operation.
+   *
+   * @param pipelineId the id of the pipeline.
+   * @return Json format of {@link RLESparseResourceAllocation}.
+   * @throws SolverException       if {@link Solver} fails;
+   * @throws SkylineStoreException if fails to get history
+   *     {@link ResourceSkyline} or predicted {code Resource} allocation
+   *     from {@link SkylineStore}.
+   */
+  @GET @Path("/estimator/{pipelineId}") @Produces(MediaType.APPLICATION_JSON)
+  public String getPrediction(
+      @PathParam(value = "pipelineId") String pipelineId)
+      throws SolverException, SkylineStoreException {
+    // first, try to grab the predicted resource allocation from the skyline
+    // store
+    RLESparseResourceAllocation result = skylineStore.getEstimation(pipelineId);
+    // if received resource allocation is null, then run the solver
+    if (result == null) {
+      RecurrenceId recurrenceId = new RecurrenceId(pipelineId, "*");
+      Map<RecurrenceId, List<ResourceSkyline>> jobHistory =
+          skylineStore.getHistory(recurrenceId);
+      result = solver.solve(jobHistory);
+    }
+    final String prediction = gson.toJson(result, rleType);
+    LOGGER.debug("Predict resource requests for pipelineId: {}." + pipelineId);
+
+    return prediction;
+  }
+
+  /**
+   * Get history {@link ResourceSkyline} from {@link SkylineStore}. This
+   * function supports the following special wildcard operations regarding
+   * {@link RecurrenceId}: If the {@code pipelineId} is "*", it will return all
+   * entries in the store; else, if the {@code runId} is "*", it will return all
+   * {@link ResourceSkyline}s belonging to the {@code pipelineId}; else, it will
+   * return all {@link ResourceSkyline}s belonging to the {{@code pipelineId},
+   * {@code runId}}. If the {@link RecurrenceId} does not exist, it will not do
+   * anything.
+   *
+   * @param pipelineId pipelineId of the history run.
+   * @param runId      runId of the history run.
+   * @return Json format of history {@link ResourceSkyline}s.
+   * @throws SkylineStoreException if fails to getHistory
+   *     {@link ResourceSkyline} from {@link SkylineStore}.
+   */
+  @GET @Path("/skylinestore/history/{pipelineId}/{runId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public String getHistoryResourceSkyline(
+      @PathParam("pipelineId") String pipelineId,
+      @PathParam("runId") String runId) throws SkylineStoreException {
+    RecurrenceId recurrenceId = new RecurrenceId(pipelineId, runId);
+    Map<RecurrenceId, List<ResourceSkyline>> jobHistory =
+        skylineStore.getHistory(recurrenceId);
+    final String skyline = gson.toJson(jobHistory, skylineStoreType);
+    LOGGER
+        .debug("Query the skyline store for recurrenceId: {}." + recurrenceId);
+
+    recurrenceId = new RecurrenceId("*", "*");
+    jobHistory = skylineStore.getHistory(recurrenceId);
+
+    return skyline;
+  }
+
+  /**
+   * Get estimated {code Resource} allocation for the pipeline.
+   *
+   * @param pipelineId id of the pipeline.
+   * @return Json format of {@link RLESparseResourceAllocation}.
+   * @throws SkylineStoreException if fails to get estimated {code Resource}
+   *                               allocation from {@link SkylineStore}.
+   */
+  @GET @Path("/skylinestore/estimation/{pipelineId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public String getEstimatedResourceAllocation(
+      @PathParam("pipelineId") String pipelineId) throws SkylineStoreException {
+    RLESparseResourceAllocation result = skylineStore.getEstimation(pipelineId);
+    final String skyline = gson.toJson(result, rleType);
+    LOGGER.debug("Query the skyline store for pipelineId: {}." + pipelineId);
+
+    return skyline;
+  }
+
+  /**
+   * Delete history {@link ResourceSkyline}s from {@link SkylineStore}.
+   * <p> Note that for safety considerations, we only allow users to delete
+   * history {@link ResourceSkyline}s of one job run.
+   *
+   * @param pipelineId pipelineId of the history run.
+   * @param runId      runId runId of the history run.
+   * @throws SkylineStoreException if fails to deleteHistory
+   *                               {@link ResourceSkyline}s.
+   */
+  @DELETE @Path("/skylinestore/history/{pipelineId}/{runId}")
+  public void deleteHistoryResourceSkyline(
+      @PathParam("pipelineId") String pipelineId,
+      @PathParam("runId") String runId) throws SkylineStoreException {
+    RecurrenceId recurrenceId = new RecurrenceId(pipelineId, runId);
+    skylineStore.deleteHistory(recurrenceId);
+    LOGGER.info("Delete ResourceSkyline for recurrenceId: {}.", recurrenceId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java
new file mode 100644
index 0000000..23e1413
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple shutdown hook for {@link ResourceEstimatorServer}.
+ */
+public class ShutdownHook extends Thread {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ShutdownHook.class);
+  private final ResourceEstimatorServer server;
+
+  ShutdownHook(ResourceEstimatorServer server) {
+    this.server = server;
+  }
+
+  public void run() {
+    try {
+      server.shutdown();
+    } catch (Exception e) {
+      LOGGER.error("HttpServer fails to shut down!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java
new file mode 100644
index 0000000..3571736
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Resource estimator service.
+ */
+
+package org.apache.hadoop.resourceestimator.service;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java
new file mode 100644
index 0000000..8fe4619
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+
+/**
+ * HistorySkylineStore stores pipeline job's {@link ResourceSkyline}s in all
+ * runs. {@code Estimator} will query the {@link ResourceSkyline}s for pipeline
+ * jobs. {@code Parser} will parse various types of job logs, construct
+ * {@link ResourceSkyline}s out of the logs and store them in the SkylineStore.
+ */
+public interface HistorySkylineStore {
+  /**
+   * Add job's resource skyline to the <em>store</em> indexed by the job's
+   * {@link RecurrenceId}. {@link RecurrenceId} is used to identify recurring
+   * pipeline jobs, and we assume that {@code
+   * ResourceEstimatorServer} users will provide the correct
+   * {@link RecurrenceId}. <p> If {@link ResourceSkyline}s to be added contain
+   * <em>null</em> elements, the function will skip them.
+   *
+   * @param recurrenceId     the unique id of user's recurring pipeline jobs.
+   * @param resourceSkylines the list of {@link ResourceSkyline}s in one run.
+   * @throws SkylineStoreException if: (1) input parameters are invalid; (2)
+   *     {@link ResourceSkyline}s to be added contain some duplicate
+   *     {@link RecurrenceId}s which already exist in the
+   *     {@link HistorySkylineStore}.
+   */
+  void addHistory(RecurrenceId recurrenceId,
+      List<ResourceSkyline> resourceSkylines) throws SkylineStoreException;
+
+  /**
+   * Delete all {@link ResourceSkyline}s belonging to given
+   * {@link RecurrenceId}.
+   * <p> Note that for safety considerations, we only allow users to
+   * deleteHistory {@link ResourceSkyline}s of one job run.
+   *
+   * @param recurrenceId the unique id of user's recurring pipeline jobs.
+   * @throws SkylineStoreException if: (1) input parameters are invalid; (2)
+   *     recurrenceId does not exist in the {@link HistorySkylineStore}.
+   */
+  void deleteHistory(RecurrenceId recurrenceId) throws SkylineStoreException;
+
+  /**
+   * Update {@link RecurrenceId} with given {@link ResourceSkyline}s. This
+   * function will deleteHistory all the {@link ResourceSkyline}s belonging to
+   * the {@link RecurrenceId}, and re-insert the given {@link ResourceSkyline}s
+   * to the SkylineStore.
+   * <p> If {@link ResourceSkyline}s contain <em>null</em> elements,
+   * the function will skip them.
+   *
+   * @param recurrenceId     the unique id of the pipeline job.
+   * @param resourceSkylines the list of {@link ResourceSkyline}s in one run.
+   * @throws SkylineStoreException if: (1) input parameters are invalid; (2)
+   *     recurrenceId does not exist in the SkylineStore.
+   */
+  void updateHistory(RecurrenceId recurrenceId,
+      List<ResourceSkyline> resourceSkylines) throws SkylineStoreException;
+
+  /**
+   * Return all {@link ResourceSkyline}s belonging to {@link RecurrenceId}.
+   * <p> This function supports the following special wildcard operations
+   * regarding {@link RecurrenceId}: If the {@code pipelineId} is "*", it will
+   * return all entries in the store; else, if the {@code runId} is "*", it
+   * will return all {@link ResourceSkyline}s belonging to the
+   * {@code pipelineId}; else, it will return all {@link ResourceSkyline}s
+   * belonging to the {{@code pipelineId}, {@code runId}}. If the
+   * {@link RecurrenceId} does not exist, it will return <em>null</em>.
+   *
+   * @param recurrenceId the unique id of the pipeline job.
+   * @return all {@link ResourceSkyline}s belonging to the recurrenceId.
+   * @throws SkylineStoreException if recurrenceId is <em>null</em>.
+   */
+  Map<RecurrenceId, List<ResourceSkyline>> getHistory(RecurrenceId recurrenceId)
+      throws SkylineStoreException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java
new file mode 100644
index 0000000..c3fedce
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.api;
+
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+
+/**
+ * PredictionSkylineStore stores the predicted
+ * {@code RLESparseResourceAllocation} of a job as computed by the
+ * {@code Estimator} based on the {@code ResourceSkyline}s of past executions in
+ * the {@code HistorySkylineStore}.
+ */
+public interface PredictionSkylineStore {
+
+  /**
+   * Add job's predicted {@code Resource} allocation to the <em>store</em>
+   * indexed by the {@code
+   * pipelineId}.
+   * <p> Note that right now we only keep the latest copy of predicted
+   * {@code Resource} allocation for the recurring pipeline.
+   *
+   * @param pipelineId       the id of the recurring pipeline.
+   * @param resourceOverTime the predicted {@code Resource} allocation for the
+   *                         pipeline.
+   * @throws SkylineStoreException if input parameters are invalid.
+   */
+  void addEstimation(String pipelineId,
+      RLESparseResourceAllocation resourceOverTime)
+      throws SkylineStoreException;
+
+  /**
+   * Return the predicted {@code Resource} allocation for the pipeline.
+   * <p> If the pipelineId does not exist, it will return <em>null</em>.
+   *
+   * @param pipelineId the unique id of the pipeline.
+   * @return the predicted {@code Resource} allocation for the pipeline.
+   * @throws SkylineStoreException if pipelineId is <em>null</em>.
+   */
+  RLESparseResourceAllocation getEstimation(String pipelineId)
+      throws SkylineStoreException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java
new file mode 100644
index 0000000..f352ed4
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.api;
+
+/**
+ * SkylineStore is composable interface for storing the history
+ * {@code ResourceSkyline}s of past job runs and the predicted
+ * {@code RLESparseResourceAllocation} for future execution.
+ */
+public interface SkylineStore
+    extends HistorySkylineStore, PredictionSkylineStore {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java
new file mode 100644
index 0000000..e833486
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * APIs for the {@code SkylineStore}.
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.api;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java
new file mode 100644
index 0000000..7c92480
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown the {@code RecurrenceId} already exists in the
+ * {@code SkylineStore}.
+ */
+public class DuplicateRecurrenceIdException extends SkylineStoreException {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public DuplicateRecurrenceIdException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java
new file mode 100644
index 0000000..55a8fa7
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown if the @link{ResourceSkyline}s to be added to the
+ * {@code SkylineStore} is empty.
+ */
+public class EmptyResourceSkylineException extends SkylineStoreException {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public EmptyResourceSkylineException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java
new file mode 100644
index 0000000..d48be7d
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown when pipelineId to be added is <em>null</em>.
+ */
+public class NullPipelineIdException extends SkylineStoreException {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public NullPipelineIdException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java
new file mode 100644
index 0000000..9aee0b6
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown if the {@code ResourceSkyline} to be added is <em>null</em>.
+ */
+public class NullRLESparseResourceAllocationException
+    extends SkylineStoreException {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public NullRLESparseResourceAllocationException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java
new file mode 100644
index 0000000..518c065
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown the {@code RecurrenceId} to be added is <em>null</em>.
+ */
+public class NullRecurrenceIdException extends SkylineStoreException {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public NullRecurrenceIdException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java
new file mode 100644
index 0000000..b70c764
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown if the {@code ResourceSkyline} to be added is <em>null</em>.
+ */
+public class NullResourceSkylineException extends SkylineStoreException {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public NullResourceSkylineException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java
new file mode 100644
index 0000000..b5e734d
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown if {@code RecurrenceId} is not found in the
+ * {@code SkylineStore}.
+ */
+public class RecurrenceIdNotFoundException extends SkylineStoreException {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public RecurrenceIdNotFoundException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java
new file mode 100644
index 0000000..751b5dd
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;
+
+/**
+ * Exception thrown the @link{SkylineStore} or the {@code Estimator} tries to
+ * addHistory or query pipeline job's resource skylines.
+ */
+public abstract class SkylineStoreException extends Exception {
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public SkylineStoreException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java
new file mode 100644
index 0000000..716e090
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * SkylineStore exception module.
+ */
+package org.apache.hadoop.resourceestimator.skylinestore.exceptions;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java
new file mode 100644
index 0000000..e00f3a0
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.DuplicateRecurrenceIdException;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.EmptyResourceSkylineException;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.RecurrenceIdNotFoundException;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.resourceestimator.skylinestore.validator.SkylineStoreValidator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An in-memory implementation of {@link SkylineStore}.
+ */
+public class InMemoryStore implements SkylineStore {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(InMemoryStore.class);
+  private final ReentrantReadWriteLock readWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+  private final SkylineStoreValidator inputValidator =
+      new SkylineStoreValidator();
+  /**
+   * A pipeline job's history {@link ResourceSkyline}s. TODO: we may flatten it
+   * out for quick access.
+   */
+  private final Map<RecurrenceId, List<ResourceSkyline>> skylineStore =
+      new HashMap<>(); // pipelineId, resource skyline
+  // Recurring pipeline's predicted {@link ResourceSkyline}s.
+  private final Map<String, RLESparseResourceAllocation> estimationStore =
+      new HashMap<>(); // pipelineId, ResourceSkyline
+
+  private List<ResourceSkyline> eliminateNull(
+      final List<ResourceSkyline> resourceSkylines) {
+    final List<ResourceSkyline> result = new ArrayList<>();
+    for (final ResourceSkyline resourceSkyline : resourceSkylines) {
+      if (resourceSkyline != null) {
+        result.add(resourceSkyline);
+      }
+    }
+    return result;
+  }
+
+  @Override public final void addHistory(final RecurrenceId recurrenceId,
+      final List<ResourceSkyline> resourceSkylines)
+      throws SkylineStoreException {
+    inputValidator.validate(recurrenceId, resourceSkylines);
+    writeLock.lock();
+    try {
+      // remove the null elements in the resourceSkylines
+      final List<ResourceSkyline> filteredInput =
+          eliminateNull(resourceSkylines);
+      if (filteredInput.size() > 0) {
+        if (skylineStore.containsKey(recurrenceId)) {
+          // if filteredInput has duplicate jobIds with existing skylines in the
+          // store,
+          // throw out an exception
+          final List<ResourceSkyline> jobHistory =
+              skylineStore.get(recurrenceId);
+          final List<String> oldJobIds = new ArrayList<>();
+          for (final ResourceSkyline resourceSkyline : jobHistory) {
+            oldJobIds.add(resourceSkyline.getJobId());
+          }
+          if (!oldJobIds.isEmpty()) {
+            for (ResourceSkyline elem : filteredInput) {
+              if (oldJobIds.contains(elem.getJobId())) {
+                StringBuilder errMsg = new StringBuilder();
+                errMsg.append(
+                    "Trying to addHistory duplicate resource skylines for "
+                        + recurrenceId
+                        + ". Use updateHistory function instead.");
+                LOGGER.error(errMsg.toString());
+                throw new DuplicateRecurrenceIdException(errMsg.toString());
+              }
+            }
+          }
+          skylineStore.get(recurrenceId).addAll(filteredInput);
+          LOGGER.info("Successfully addHistory new resource skylines for {}.",
+              recurrenceId);
+        } else {
+          skylineStore.put(recurrenceId, filteredInput);
+          LOGGER.info("Successfully addHistory new resource skylines for {}.",
+              recurrenceId);
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override public void addEstimation(String pipelineId,
+      RLESparseResourceAllocation resourceSkyline)
+      throws SkylineStoreException {
+    inputValidator.validate(pipelineId, resourceSkyline);
+    writeLock.lock();
+    try {
+      estimationStore.put(pipelineId, resourceSkyline);
+      LOGGER.info("Successfully add estimated resource allocation for {}.",
+          pipelineId);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override public final void deleteHistory(final RecurrenceId recurrenceId)
+      throws SkylineStoreException {
+    inputValidator.validate(recurrenceId);
+    writeLock.lock();
+    try {
+      if (skylineStore.containsKey(recurrenceId)) {
+        skylineStore.remove(recurrenceId);
+        LOGGER.warn("Delete resource skylines for {}.", recurrenceId);
+      } else {
+        StringBuilder errMsg = new StringBuilder();
+        errMsg.append(
+            "Trying to deleteHistory non-existing recurring pipeline  "
+                + recurrenceId + "\'s resource skylines");
+        LOGGER.error(errMsg.toString());
+        throw new RecurrenceIdNotFoundException(errMsg.toString());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override public final void updateHistory(final RecurrenceId recurrenceId,
+      final List<ResourceSkyline> resourceSkylines)
+      throws SkylineStoreException {
+    inputValidator.validate(recurrenceId, resourceSkylines);
+    writeLock.lock();
+    try {
+      if (skylineStore.containsKey(recurrenceId)) {
+        // remove the null elements in the resourceSkylines
+        List<ResourceSkyline> filteredInput = eliminateNull(resourceSkylines);
+        if (filteredInput.size() > 0) {
+          skylineStore.put(recurrenceId, filteredInput);
+          LOGGER.info("Successfully updateHistory resource skylines for {}.",
+              recurrenceId);
+        } else {
+          StringBuilder errMsg = new StringBuilder();
+          errMsg.append("Trying to updateHistory " + recurrenceId
+              + " with empty resource skyline");
+          LOGGER.error(errMsg.toString());
+          throw new EmptyResourceSkylineException(errMsg.toString());
+        }
+      } else {
+        StringBuilder errMsg = new StringBuilder();
+        errMsg.append(
+            "Trying to updateHistory non-existing resource skylines for "
+                + recurrenceId);
+        LOGGER.error(errMsg.toString());
+        throw new RecurrenceIdNotFoundException(errMsg.toString());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override public final Map<RecurrenceId, List<ResourceSkyline>> getHistory(
+      final RecurrenceId recurrenceId) throws SkylineStoreException {
+    inputValidator.validate(recurrenceId);
+    readLock.lock();
+    try {
+      String pipelineId = recurrenceId.getPipelineId();
+      // User tries to getHistory all resource skylines in the skylineStore
+      if (pipelineId.equals("*")) {
+        LOGGER
+            .info("Successfully query resource skylines for {}.", recurrenceId);
+        return Collections.unmodifiableMap(skylineStore);
+      }
+      String runId = recurrenceId.getRunId();
+      Map<RecurrenceId, List<ResourceSkyline>> result =
+          new HashMap<RecurrenceId, List<ResourceSkyline>>();
+      // User tries to getHistory pipelineId's all resource skylines in the
+      // skylineStore
+      if (runId.equals("*")) {
+        // TODO: this for loop is expensive, so we may change the type of
+        // skylineStore to
+        // speed up this loop.
+        for (Map.Entry<RecurrenceId, List<ResourceSkyline>> entry : skylineStore
+            .entrySet()) {
+          RecurrenceId index = entry.getKey();
+          if (index.getPipelineId().equals(pipelineId)) {
+            result.put(index, entry.getValue());
+          }
+        }
+        if (result.size() > 0) {
+          LOGGER.info("Successfully query resource skylines for {}.",
+              recurrenceId);
+          return Collections.unmodifiableMap(result);
+        } else {
+          LOGGER.warn(
+              "Trying to getHistory non-existing resource skylines for {}.",
+              recurrenceId);
+          return null;
+        }
+      }
+      // User tries to getHistory {pipelineId, runId}'s resource skylines
+      if (skylineStore.containsKey(recurrenceId)) {
+        result.put(recurrenceId, skylineStore.get(recurrenceId));
+      } else {
+        LOGGER
+            .warn("Trying to getHistory non-existing resource skylines for {}.",
+                recurrenceId);
+        return null;
+      }
+      LOGGER.info("Successfully query resource skylines for {}.", recurrenceId);
+      return Collections.unmodifiableMap(result);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override public final RLESparseResourceAllocation getEstimation(
+      String pipelineId) throws SkylineStoreException {
+    inputValidator.validate(pipelineId);
+    readLock.lock();
+    try {
+      return estimationStore.get(pipelineId);
+    } finally {
+      readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java
new file mode 100644
index 0000000..ffccd5d
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for {@code SkylineStore}.
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.impl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java
new file mode 100644
index 0000000..f5f50f5
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.validator;
+
+import java.util.List;
+
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullPipelineIdException;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullRLESparseResourceAllocationException;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullRecurrenceIdException;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullResourceSkylineException;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SkylineStoreValidator validates input parameters for {@link SkylineStore}.
+ */
+public class SkylineStoreValidator {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SkylineStoreValidator.class);
+
+  /**
+   * Check if recurrenceId is <em>null</em>.
+   *
+   * @param recurrenceId the id of the recurring pipeline job.
+   * @throws SkylineStoreException if input parameters are invalid.
+   */
+  public final void validate(final RecurrenceId recurrenceId)
+      throws SkylineStoreException {
+    if (recurrenceId == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Recurrence id is null, please try again by specifying"
+          + " a valid Recurrence id.");
+      LOGGER.error(sb.toString());
+      throw new NullRecurrenceIdException(sb.toString());
+    }
+  }
+
+  /**
+   * Check if pipelineId is <em>null</em>.
+   *
+   * @param pipelineId the id of the recurring pipeline job.
+   * @throws SkylineStoreException if input parameters are invalid.
+   */
+  public final void validate(final String pipelineId)
+      throws SkylineStoreException {
+    if (pipelineId == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("pipelineId is null, please try again by specifying"
+          + " a valid pipelineId.");
+      LOGGER.error(sb.toString());
+      throw new NullPipelineIdException(sb.toString());
+    }
+  }
+
+  /**
+   * Check if recurrenceId is <em>null</em> or resourceSkylines is
+   * <em>null</em>.
+   *
+   * @param recurrenceId     the id of the recurring pipeline job.
+   * @param resourceSkylines the list of {@link ResourceSkyline}s to be added.
+   * @throws SkylineStoreException if input parameters are invalid.
+   */
+  public final void validate(final RecurrenceId recurrenceId,
+      final List<ResourceSkyline> resourceSkylines)
+      throws SkylineStoreException {
+    validate(recurrenceId);
+    if (resourceSkylines == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("ResourceSkylines for " + recurrenceId
+          + " is null, please try again by "
+          + "specifying valid ResourceSkylines.");
+      LOGGER.error(sb.toString());
+      throw new NullResourceSkylineException(sb.toString());
+    }
+  }
+
+  /**
+   * Check if pipelineId is <em>null</em> or resourceOverTime is <em>null</em>.
+   *
+   * @param pipelineId       the id of the recurring pipeline.
+   * @param resourceOverTime predicted {@code Resource} allocation to be added.
+   * @throws SkylineStoreException if input parameters are invalid.
+   */
+  public final void validate(final String pipelineId,
+      final RLESparseResourceAllocation resourceOverTime)
+      throws SkylineStoreException {
+    validate(pipelineId);
+    if (resourceOverTime == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Resource allocation for " + pipelineId + " is null.");
+      LOGGER.error(sb.toString());
+      throw new NullRLESparseResourceAllocationException(sb.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java
new file mode 100644
index 0000000..23d67c5
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Validator for {@code SkylineStore}.
+ */
+
+package org.apache.hadoop.resourceestimator.skylinestore.validator;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java
new file mode 100644
index 0000000..7958a6f
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.solver.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
+import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
+import org.apache.hadoop.resourceestimator.skylinestore.api.PredictionSkylineStore;
+import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
+import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+
+/**
+ * Solver takes recurring pipeline's {@link ResourceSkyline} history as input,
+ * predicts its {@link Resource} requirement at each time t for the next run,
+ * and translate them into {@link ResourceSkyline} which will be used to make
+ * recurring resource reservations.
+ */
+public interface Solver {
+  /**
+   * Initializing the Solver, including loading solver parameters from
+   * configuration file.
+   *
+   * @param config       {@link Configuration} for the Solver.
+   * @param skylineStore the {@link PredictionSkylineStore} which stores
+   *                     predicted {@code Resource} allocations.
+   */
+  void init(Configuration config, PredictionSkylineStore skylineStore);
+
+  /**
+   * The Solver reads recurring pipeline's {@link ResourceSkyline} history, and
+   * precits its {@link ResourceSkyline} requirements for the next run.
+   *
+   * @param jobHistory the {@link ResourceSkyline}s of the recurring pipeline in
+   *     previous runs. The {@link RecurrenceId} identifies one run of the
+   *     recurring pipeline, and the list of {@link ResourceSkyline}s
+   *     records the {@link ResourceSkyline} of each job within the pipeline.
+   * @return the amount of {@link Resource} requested by the pipeline for the
+   * next run (discretized by timeInterval).
+   * @throws SolverException       if: (1) input is invalid; (2) the number of
+   *     instances in the jobHistory is smaller than the minimum
+   *     requirement; (3) solver runtime has unexpected behaviors;
+   * @throws SkylineStoreException if it fails to add predicted {@code Resource}
+   *     allocation to the {@link PredictionSkylineStore}.
+   */
+  RLESparseResourceAllocation solve(
+      Map<RecurrenceId, List<ResourceSkyline>> jobHistory)
+      throws SolverException, SkylineStoreException;
+
+  /**
+   * Release the resource used by the Solver.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java
new file mode 100644
index 0000000..fc8363d
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * API for {@code Solver}.
+ */
+
+package org.apache.hadoop.resourceestimator.solver.api;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java
new file mode 100644
index 0000000..ff51f5f
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.solver.exceptions;
+
+/**
+ * Exception thrown the {@code SkylineStore} or the {@code Estimator} tries to
+ * addHistory or query pipeline job's resource skylines.
+ */
+public class InvalidInputException extends SolverException {
+
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public InvalidInputException(final String entity, final String reason) {
+    super(entity + " is " + reason + ", please try again with valid " + entity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java
new file mode 100644
index 0000000..9b614b6
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.solver.exceptions;
+
+/**
+ * Exception thrown the @link{SkylineStore} or the {@code Estimator} tries to
+ * addHistory or query pipeline job's resource skylines.
+ */
+public class InvalidSolverException extends SolverException {
+
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public InvalidSolverException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java
new file mode 100644
index 0000000..57507ea
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.solver.exceptions;
+
+/**
+ * Exception thrown the @link{SkylineStore} or the {@code Estimator} tries to
+ * addHistory or query pipeline job's resource skylines.
+ */
+public abstract class SolverException extends Exception {
+
+  private static final long serialVersionUID = -684069387367879218L;
+
+  public SolverException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java
new file mode 100644
index 0000000..bd45324
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Exception module.
+ */
+package org.apache.hadoop.resourceestimator.solver.exceptions;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82c9b3bb/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java
new file mode 100644
index 0000000..55abb1c
--- /dev/null
+++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.resourceestimator.solver.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
+
+/**
+ * Common functions shared by {@code Solver} (translate predicted resource
+ * allocation into Hadoop's {@link ReservationSubmissionRequest}.
+ */
+public abstract class BaseSolver {
+  /**
+   * Used to generate {@link ReservationId}.
+   */
+  private static final Random RAND = new Random();
+
+  /**
+   * Translate the estimated {@link Resource} requirements of the pipeline to
+   * Hadoop's {@link ReservationSubmissionRequest}.
+   *
+   * @param containerSpec     the {@link Resource} to be allocated to each
+   *                          container;
+   * @param containerRequests the predicted {@link Resource} to be allocated to
+   *                          the job in each discrete time intervals;
+   * @param config            configuration file for BaseSolver.
+   * @return {@link ReservationSubmissionRequest} to be submitted to Hadoop to
+   * make recurring resource reservation for the pipeline.
+   */
+  public final ReservationSubmissionRequest toRecurringRDL(
+      final Resource containerSpec,
+      final RLESparseResourceAllocation containerRequests,
+      final Configuration config) {
+    final int timeInterval =
+        config.getInt(ResourceEstimatorConfiguration.TIME_INTERVAL_KEY, 5);
+    long pipelineSubmissionTime = containerRequests.getEarliestStartTime();
+    long pipelineFinishTime = containerRequests.getLatestNonNullTime();
+    final long containerMemAlloc = containerSpec.getMemorySize();
+    final long jobLen =
+        (pipelineFinishTime - pipelineSubmissionTime) / timeInterval;
+    List<ReservationRequest> reservationRequestList = new ArrayList<>();
+    for (int i = 0; i < jobLen; i++) {
+      // container spec, # of containers, concurrency, duration
+      ReservationRequest reservationRequest = ReservationRequest
+          .newInstance(containerSpec, (int) (
+              containerRequests.getCapacityAtTime(i * timeInterval)
+                  .getMemorySize() / containerMemAlloc), 1, timeInterval);
+      reservationRequestList.add(reservationRequest);
+    }
+    ReservationRequests reservationRequests = ReservationRequests
+        .newInstance(reservationRequestList,
+            ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition reservationDefinition = ReservationDefinition
+        .newInstance(pipelineSubmissionTime, pipelineFinishTime,
+            reservationRequests, "LpSolver#toRecurringRDL");
+    ReservationId reservationId =
+        ReservationId.newInstance(RAND.nextLong(), RAND.nextLong());
+    ReservationSubmissionRequest reservationSubmissionRequest =
+        ReservationSubmissionRequest
+            .newInstance(reservationDefinition, "resourceestimator",
+                reservationId);
+    return reservationSubmissionRequest;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message