ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject [08/22] ambari git commit: AMBARI-5707. Renaming a module. (swagle)
Date Mon, 01 Dec 2014 20:03:36 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..4c8d745
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * File system implementation of {@link ApplicationHistoryStore}. In this
+ * implementation, one application will have just one file in the file system,
+ * which contains all the history data of one application, and its attempts and
+ * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to
+ * be invoked first when writing any history data of one application and it will
+ * open a file, while {@link #applicationFinished(ApplicationFinishData)} is
+ * supposed to be last writing operation and will close the file.
+ */
+@Public
+@Unstable
+public class FileSystemApplicationHistoryStore extends AbstractService
+    implements ApplicationHistoryStore {
+
+  private static final Log LOG = LogFactory
+    .getLog(FileSystemApplicationHistoryStore.class);
+
+  private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+  private static final int MIN_BLOCK_SIZE = 256 * 1024;
+  private static final String START_DATA_SUFFIX = "_start";
+  private static final String FINISH_DATA_SUFFIX = "_finish";
+  private static final FsPermission ROOT_DIR_UMASK = FsPermission
+    .createImmutable((short) 0740);
+  private static final FsPermission HISTORY_FILE_UMASK = FsPermission
+    .createImmutable((short) 0640);
+
+  private FileSystem fs;
+  private Path rootDirPath;
+
+  private ConcurrentMap<ApplicationId, HistoryFileWriter> outstandingWriters =
+      new ConcurrentHashMap<ApplicationId, HistoryFileWriter>();
+
+  public FileSystemApplicationHistoryStore() {
+    super(FileSystemApplicationHistoryStore.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Path fsWorkingPath =
+        new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI));
+    rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+    try {
+      fs = fsWorkingPath.getFileSystem(conf);
+      fs.mkdirs(rootDirPath);
+      fs.setPermission(rootDirPath, ROOT_DIR_UMASK);
+    } catch (IOException e) {
+      LOG.error("Error when initializing FileSystemHistoryStorage", e);
+      throw e;
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    try {
+      for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
+        .entrySet()) {
+        entry.getValue().close();
+      }
+      outstandingWriters.clear();
+    } finally {
+      IOUtils.cleanup(LOG, fs);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public ApplicationHistoryData getApplication(ApplicationId appId)
+      throws IOException {
+    HistoryFileReader hfReader = getHistoryFileReader(appId);
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ApplicationHistoryData historyData =
+          ApplicationHistoryData.newInstance(appId, null, null, null, null,
+            Long.MIN_VALUE, Long.MIN_VALUE, Long.MAX_VALUE, null,
+            FinalApplicationStatus.UNDEFINED, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(appId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ApplicationStartData startData =
+                parseApplicationStartData(entry.value);
+            mergeApplicationHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ApplicationFinishData finishData =
+                parseApplicationFinishData(entry.value);
+            mergeApplicationHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for application " + appId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for application " + appId);
+      }
+      LOG.info("Completed reading history information of application " + appId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of application " + appId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+      throws IOException {
+    Map<ApplicationId, ApplicationHistoryData> historyDataMap =
+        new HashMap<ApplicationId, ApplicationHistoryData>();
+    FileStatus[] files = fs.listStatus(rootDirPath);
+    for (FileStatus file : files) {
+      ApplicationId appId =
+          ConverterUtils.toApplicationId(file.getPath().getName());
+      try {
+        ApplicationHistoryData historyData = getApplication(appId);
+        if (historyData != null) {
+          historyDataMap.put(appId, historyData);
+        }
+      } catch (IOException e) {
+        // Eat the exception not to disturb the getting the next
+        // ApplicationHistoryData
+        LOG.error("History information of application " + appId
+            + " is not included into the result due to the exception", e);
+      }
+    }
+    return historyDataMap;
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) throws IOException {
+    Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap =
+        new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
+    HistoryFileReader hfReader = getHistoryFileReader(appId);
+    try {
+      while (hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.startsWith(
+            ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
+          ApplicationAttemptId appAttemptId = 
+              ConverterUtils.toApplicationAttemptId(entry.key.id);
+          if (appAttemptId.getApplicationId().equals(appId)) {
+            ApplicationAttemptHistoryData historyData = 
+                historyDataMap.get(appAttemptId);
+            if (historyData == null) {
+              historyData = ApplicationAttemptHistoryData.newInstance(
+                  appAttemptId, null, -1, null, null, null,
+                  FinalApplicationStatus.UNDEFINED, null);
+              historyDataMap.put(appAttemptId, historyData);
+            }
+            if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+              mergeApplicationAttemptHistoryData(historyData,
+                  parseApplicationAttemptStartData(entry.value));
+            } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+              mergeApplicationAttemptHistoryData(historyData,
+                  parseApplicationAttemptFinishData(entry.value));
+            }
+          }
+        }
+      }
+      LOG.info("Completed reading history information of all application"
+          + " attempts of application " + appId);
+    } catch (IOException e) {
+      LOG.info("Error when reading history information of some application"
+          + " attempts of application " + appId);
+    } finally {
+      hfReader.close();
+    }
+    return historyDataMap;
+  }
+
+  @Override
+  public ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    HistoryFileReader hfReader =
+        getHistoryFileReader(appAttemptId.getApplicationId());
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ApplicationAttemptHistoryData historyData =
+          ApplicationAttemptHistoryData.newInstance(appAttemptId, null, -1,
+            null, null, null, FinalApplicationStatus.UNDEFINED, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(appAttemptId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ApplicationAttemptStartData startData =
+                parseApplicationAttemptStartData(entry.value);
+            mergeApplicationAttemptHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ApplicationAttemptFinishData finishData =
+                parseApplicationAttemptFinishData(entry.value);
+            mergeApplicationAttemptHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for application attempt "
+            + appAttemptId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for application attempt "
+            + appAttemptId);
+      }
+      LOG.info("Completed reading history information of application attempt "
+          + appAttemptId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of application attempt"
+          + appAttemptId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getContainer(ContainerId containerId)
+      throws IOException {
+    HistoryFileReader hfReader =
+        getHistoryFileReader(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ContainerHistoryData historyData =
+          ContainerHistoryData
+            .newInstance(containerId, null, null, null, Long.MIN_VALUE,
+              Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(containerId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ContainerStartData startData = parseContainerStartData(entry.value);
+            mergeContainerHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ContainerFinishData finishData =
+                parseContainerFinishData(entry.value);
+            mergeContainerHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for container " + containerId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for container " + containerId);
+      }
+      LOG.info("Completed reading history information of container "
+          + containerId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of container " + containerId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+      throws IOException {
+    ApplicationAttemptHistoryData attemptHistoryData =
+        getApplicationAttempt(appAttemptId);
+    if (attemptHistoryData == null
+        || attemptHistoryData.getMasterContainerId() == null) {
+      return null;
+    }
+    return getContainer(attemptHistoryData.getMasterContainerId());
+  }
+
+  @Override
+  public Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    Map<ContainerId, ContainerHistoryData> historyDataMap =
+        new HashMap<ContainerId, ContainerHistoryData>();
+    HistoryFileReader hfReader =
+        getHistoryFileReader(appAttemptId.getApplicationId());
+    try {
+      while (hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+          ContainerId containerId =
+              ConverterUtils.toContainerId(entry.key.id);
+          if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
+            ContainerHistoryData historyData =
+                historyDataMap.get(containerId);
+            if (historyData == null) {
+              historyData = ContainerHistoryData.newInstance(
+                  containerId, null, null, null, Long.MIN_VALUE,
+                  Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+              historyDataMap.put(containerId, historyData);
+            }
+            if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+              mergeContainerHistoryData(historyData,
+                  parseContainerStartData(entry.value));
+            } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+              mergeContainerHistoryData(historyData,
+                  parseContainerFinishData(entry.value));
+            }
+          }
+        }
+      }
+      LOG.info("Completed reading history information of all conatiners"
+          + " of application attempt " + appAttemptId);
+    } catch (IOException e) {
+      LOG.info("Error when reading history information of some containers"
+          + " of application attempt " + appAttemptId);
+    } finally {
+      hfReader.close();
+    }
+    return historyDataMap;
+  }
+
+  @Override
+  public void applicationStarted(ApplicationStartData appStart)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        outstandingWriters.get(appStart.getApplicationId());
+    if (hfWriter == null) {
+      Path applicationHistoryFile =
+          new Path(rootDirPath, appStart.getApplicationId().toString());
+      try {
+        hfWriter = new HistoryFileWriter(applicationHistoryFile);
+        LOG.info("Opened history file of application "
+            + appStart.getApplicationId());
+      } catch (IOException e) {
+        LOG.error("Error when openning history file of application "
+            + appStart.getApplicationId());
+        throw e;
+      }
+      outstandingWriters.put(appStart.getApplicationId(), hfWriter);
+    } else {
+      throw new IOException("History file of application "
+          + appStart.getApplicationId() + " is already opened");
+    }
+    assert appStart instanceof ApplicationStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId()
+        .toString(), START_DATA_SUFFIX),
+        ((ApplicationStartDataPBImpl) appStart).getProto().toByteArray());
+      LOG.info("Start information of application "
+          + appStart.getApplicationId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of application "
+          + appStart.getApplicationId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void applicationFinished(ApplicationFinishData appFinish)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appFinish.getApplicationId());
+    assert appFinish instanceof ApplicationFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appFinish.getApplicationId()
+        .toString(), FINISH_DATA_SUFFIX),
+        ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
+      LOG.info("Finish information of application "
+          + appFinish.getApplicationId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of application "
+          + appFinish.getApplicationId());
+      throw e;
+    } finally {
+      hfWriter.close();
+      outstandingWriters.remove(appFinish.getApplicationId());
+    }
+  }
+
+  @Override
+  public void applicationAttemptStarted(
+      ApplicationAttemptStartData appAttemptStart) throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
+          .getApplicationId());
+    assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appAttemptStart
+        .getApplicationAttemptId().toString(), START_DATA_SUFFIX),
+        ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
+          .toByteArray());
+      LOG.info("Start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void applicationAttemptFinished(
+      ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
+          .getApplicationId());
+    assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appAttemptFinish
+        .getApplicationAttemptId().toString(), FINISH_DATA_SUFFIX),
+        ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
+          .toByteArray());
+      LOG.info("Finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void containerStarted(ContainerStartData containerStart)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(containerStart.getContainerId()
+          .getApplicationAttemptId().getApplicationId());
+    assert containerStart instanceof ContainerStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(containerStart
+        .getContainerId().toString(), START_DATA_SUFFIX),
+        ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
+      LOG.info("Start information of container "
+          + containerStart.getContainerId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of container "
+          + containerStart.getContainerId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void containerFinished(ContainerFinishData containerFinish)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(containerFinish.getContainerId()
+          .getApplicationAttemptId().getApplicationId());
+    assert containerFinish instanceof ContainerFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(containerFinish
+        .getContainerId().toString(), FINISH_DATA_SUFFIX),
+        ((ContainerFinishDataPBImpl) containerFinish).getProto().toByteArray());
+      LOG.info("Finish information of container "
+          + containerFinish.getContainerId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of container "
+          + containerFinish.getContainerId());
+    }
+  }
+
+  private static ApplicationStartData parseApplicationStartData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ApplicationStartDataPBImpl(
+      ApplicationStartDataProto.parseFrom(value));
+  }
+
+  private static ApplicationFinishData parseApplicationFinishData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ApplicationFinishDataPBImpl(
+      ApplicationFinishDataProto.parseFrom(value));
+  }
+
+  private static ApplicationAttemptStartData parseApplicationAttemptStartData(
+      byte[] value) throws InvalidProtocolBufferException {
+    return new ApplicationAttemptStartDataPBImpl(
+      ApplicationAttemptStartDataProto.parseFrom(value));
+  }
+
+  private static ApplicationAttemptFinishData
+      parseApplicationAttemptFinishData(byte[] value)
+          throws InvalidProtocolBufferException {
+    return new ApplicationAttemptFinishDataPBImpl(
+      ApplicationAttemptFinishDataProto.parseFrom(value));
+  }
+
+  private static ContainerStartData parseContainerStartData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ContainerStartDataPBImpl(
+      ContainerStartDataProto.parseFrom(value));
+  }
+
+  private static ContainerFinishData parseContainerFinishData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ContainerFinishDataPBImpl(
+      ContainerFinishDataProto.parseFrom(value));
+  }
+
+  private static void mergeApplicationHistoryData(
+      ApplicationHistoryData historyData, ApplicationStartData startData) {
+    historyData.setApplicationName(startData.getApplicationName());
+    historyData.setApplicationType(startData.getApplicationType());
+    historyData.setQueue(startData.getQueue());
+    historyData.setUser(startData.getUser());
+    historyData.setSubmitTime(startData.getSubmitTime());
+    historyData.setStartTime(startData.getStartTime());
+  }
+
+  private static void mergeApplicationHistoryData(
+      ApplicationHistoryData historyData, ApplicationFinishData finishData) {
+    historyData.setFinishTime(finishData.getFinishTime());
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setFinalApplicationStatus(finishData
+      .getFinalApplicationStatus());
+    historyData.setYarnApplicationState(finishData.getYarnApplicationState());
+  }
+
+  private static void mergeApplicationAttemptHistoryData(
+      ApplicationAttemptHistoryData historyData,
+      ApplicationAttemptStartData startData) {
+    historyData.setHost(startData.getHost());
+    historyData.setRPCPort(startData.getRPCPort());
+    historyData.setMasterContainerId(startData.getMasterContainerId());
+  }
+
+  private static void mergeApplicationAttemptHistoryData(
+      ApplicationAttemptHistoryData historyData,
+      ApplicationAttemptFinishData finishData) {
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setTrackingURL(finishData.getTrackingURL());
+    historyData.setFinalApplicationStatus(finishData
+      .getFinalApplicationStatus());
+    historyData.setYarnApplicationAttemptState(finishData
+      .getYarnApplicationAttemptState());
+  }
+
+  private static void mergeContainerHistoryData(
+      ContainerHistoryData historyData, ContainerStartData startData) {
+    historyData.setAllocatedResource(startData.getAllocatedResource());
+    historyData.setAssignedNode(startData.getAssignedNode());
+    historyData.setPriority(startData.getPriority());
+    historyData.setStartTime(startData.getStartTime());
+  }
+
+  private static void mergeContainerHistoryData(
+      ContainerHistoryData historyData, ContainerFinishData finishData) {
+    historyData.setFinishTime(finishData.getFinishTime());
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setContainerExitStatus(finishData.getContainerExitStatus());
+    historyData.setContainerState(finishData.getContainerState());
+  }
+
+  private HistoryFileWriter getHistoryFileWriter(ApplicationId appId)
+      throws IOException {
+    HistoryFileWriter hfWriter = outstandingWriters.get(appId);
+    if (hfWriter == null) {
+      throw new IOException("History file of application " + appId
+          + " is not opened");
+    }
+    return hfWriter;
+  }
+
+  private HistoryFileReader getHistoryFileReader(ApplicationId appId)
+      throws IOException {
+    Path applicationHistoryFile = new Path(rootDirPath, appId.toString());
+    if (!fs.exists(applicationHistoryFile)) {
+      throw new IOException("History file for application " + appId
+          + " is not found");
+    }
+    // The history file is still under writing
+    if (outstandingWriters.containsKey(appId)) {
+      throw new IOException("History file for application " + appId
+          + " is under writing");
+    }
+    return new HistoryFileReader(applicationHistoryFile);
+  }
+
+  private class HistoryFileReader {
+
+    private class Entry {
+
+      private HistoryDataKey key;
+      private byte[] value;
+
+      public Entry(HistoryDataKey key, byte[] value) {
+        this.key = key;
+        this.value = value;
+      }
+    }
+
+    private TFile.Reader reader;
+    private TFile.Reader.Scanner scanner;
+
+    public HistoryFileReader(Path historyFile) throws IOException {
+      FSDataInputStream fsdis = fs.open(historyFile);
+      reader =
+          new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
+            getConfig());
+      reset();
+    }
+
+    public boolean hasNext() {
+      return !scanner.atEnd();
+    }
+
+    public Entry next() throws IOException {
+      TFile.Reader.Scanner.Entry entry = scanner.entry();
+      DataInputStream dis = entry.getKeyStream();
+      HistoryDataKey key = new HistoryDataKey();
+      key.readFields(dis);
+      dis = entry.getValueStream();
+      byte[] value = new byte[entry.getValueLength()];
+      dis.read(value);
+      scanner.advance();
+      return new Entry(key, value);
+    }
+
+    public void reset() throws IOException {
+      IOUtils.cleanup(LOG, scanner);
+      scanner = reader.createScanner();
+    }
+
+    public void close() {
+      IOUtils.cleanup(LOG, scanner, reader);
+    }
+
+  }
+
+  private class HistoryFileWriter {
+
+    private FSDataOutputStream fsdos;
+    private TFile.Writer writer;
+
+    public HistoryFileWriter(Path historyFile) throws IOException {
+      if (fs.exists(historyFile)) {
+        fsdos = fs.append(historyFile);
+      } else {
+        fsdos = fs.create(historyFile);
+      }
+      fs.setPermission(historyFile, HISTORY_FILE_UMASK);
+      writer =
+          new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
+            YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
+            YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
+            getConfig());
+    }
+
+    public synchronized void close() {
+      IOUtils.cleanup(LOG, writer, fsdos);
+    }
+
+    public synchronized void writeHistoryData(HistoryDataKey key, byte[] value)
+        throws IOException {
+      DataOutputStream dos = null;
+      try {
+        dos = writer.prepareAppendKey(-1);
+        key.write(dos);
+      } finally {
+        IOUtils.cleanup(LOG, dos);
+      }
+      try {
+        dos = writer.prepareAppendValue(value.length);
+        dos.write(value);
+      } finally {
+        IOUtils.cleanup(LOG, dos);
+      }
+    }
+
+  }
+
+  private static class HistoryDataKey implements Writable {
+
+    private String id;
+
+    private String suffix;
+
+    public HistoryDataKey() {
+      this(null, null);
+    }
+
+    public HistoryDataKey(String id, String suffix) {
+      this.id = id;
+      this.suffix = suffix;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(id);
+      out.writeUTF(suffix);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readUTF();
+      suffix = in.readUTF();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
new file mode 100644
index 0000000..c226ad3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * In-memory implementation of {@link ApplicationHistoryStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ * 
+ */
+@Private
+@Unstable
+public class MemoryApplicationHistoryStore extends AbstractService implements
+    ApplicationHistoryStore {
+
+  private final ConcurrentMap<ApplicationId, ApplicationHistoryData> applicationData =
+      new ConcurrentHashMap<ApplicationId, ApplicationHistoryData>();
+  private final ConcurrentMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData =
+      new ConcurrentHashMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>();
+  private final ConcurrentMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>> containerData =
+      new ConcurrentHashMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>>();
+
+  public MemoryApplicationHistoryStore() {
+    super(MemoryApplicationHistoryStore.class.getName());
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationHistoryData> getAllApplications() {
+    return new HashMap<ApplicationId, ApplicationHistoryData>(applicationData);
+  }
+
+  @Override
+  public ApplicationHistoryData getApplication(ApplicationId appId) {
+    return applicationData.get(appId);
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        applicationAttemptData.get(appId);
+    if (subMap == null) {
+      return Collections
+        .<ApplicationAttemptId, ApplicationAttemptHistoryData> emptyMap();
+    } else {
+      return new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>(
+        subMap);
+    }
+  }
+
+  @Override
+  public ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        applicationAttemptData.get(appAttemptId.getApplicationId());
+    if (subMap == null) {
+      return null;
+    } else {
+      return subMap.get(appAttemptId);
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) {
+    ApplicationAttemptHistoryData appAttempt =
+        getApplicationAttempt(appAttemptId);
+    if (appAttempt == null || appAttempt.getMasterContainerId() == null) {
+      return null;
+    } else {
+      return getContainer(appAttempt.getMasterContainerId());
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getContainer(ContainerId containerId) {
+    Map<ContainerId, ContainerHistoryData> subMap =
+        containerData.get(containerId.getApplicationAttemptId());
+    if (subMap == null) {
+      return null;
+    } else {
+      return subMap.get(containerId);
+    }
+  }
+
+  @Override
+  public Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+        containerData.get(appAttemptId);
+    if (subMap == null) {
+      return Collections.<ContainerId, ContainerHistoryData> emptyMap();
+    } else {
+      return new HashMap<ContainerId, ContainerHistoryData>(subMap);
+    }
+  }
+
+  @Override
+  public void applicationStarted(ApplicationStartData appStart)
+      throws IOException {
+    ApplicationHistoryData oldData =
+        applicationData.putIfAbsent(appStart.getApplicationId(),
+          ApplicationHistoryData.newInstance(appStart.getApplicationId(),
+            appStart.getApplicationName(), appStart.getApplicationType(),
+            appStart.getQueue(), appStart.getUser(), appStart.getSubmitTime(),
+            appStart.getStartTime(), Long.MAX_VALUE, null, null, null));
+    if (oldData != null) {
+      throw new IOException("The start information of application "
+          + appStart.getApplicationId() + " is already stored.");
+    }
+  }
+
+  @Override
+  public void applicationFinished(ApplicationFinishData appFinish)
+      throws IOException {
+    ApplicationHistoryData data =
+        applicationData.get(appFinish.getApplicationId());
+    if (data == null) {
+      throw new IOException("The finish information of application "
+          + appFinish.getApplicationId() + " is stored before the start"
+          + " information.");
+    }
+    // Make the assumption that YarnApplicationState should not be null if
+    // the finish information is already recorded
+    if (data.getYarnApplicationState() != null) {
+      throw new IOException("The finish information of application "
+          + appFinish.getApplicationId() + " is already stored.");
+    }
+    data.setFinishTime(appFinish.getFinishTime());
+    data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo());
+    data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus());
+    data.setYarnApplicationState(appFinish.getYarnApplicationState());
+  }
+
+  @Override
+  public void applicationAttemptStarted(
+      ApplicationAttemptStartData appAttemptStart) throws IOException {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId());
+    ApplicationAttemptHistoryData oldData =
+        subMap.putIfAbsent(appAttemptStart.getApplicationAttemptId(),
+          ApplicationAttemptHistoryData.newInstance(
+            appAttemptStart.getApplicationAttemptId(),
+            appAttemptStart.getHost(), appAttemptStart.getRPCPort(),
+            appAttemptStart.getMasterContainerId(), null, null, null, null));
+    if (oldData != null) {
+      throw new IOException("The start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId() + " is already stored.");
+    }
+  }
+
+  @Override
+  public void applicationAttemptFinished(
+      ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId());
+    ApplicationAttemptHistoryData data =
+        subMap.get(appAttemptFinish.getApplicationAttemptId());
+    if (data == null) {
+      throw new IOException("The finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId() + " is stored before"
+          + " the start information.");
+    }
+    // Make the assumption that YarnApplicationAttemptState should not be null
+    // if the finish information is already recorded
+    if (data.getYarnApplicationAttemptState() != null) {
+      throw new IOException("The finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId() + " is already stored.");
+    }
+    data.setTrackingURL(appAttemptFinish.getTrackingURL());
+    data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo());
+    data
+      .setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus());
+    data.setYarnApplicationAttemptState(appAttemptFinish
+      .getYarnApplicationAttemptState());
+  }
+
+  private ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getSubMap(ApplicationId appId) {
+    applicationAttemptData
+      .putIfAbsent(
+        appId,
+        new ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>());
+    return applicationAttemptData.get(appId);
+  }
+
+  @Override
+  public void containerStarted(ContainerStartData containerStart)
+      throws IOException {
+    ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+        getSubMap(containerStart.getContainerId().getApplicationAttemptId());
+    ContainerHistoryData oldData =
+        subMap.putIfAbsent(containerStart.getContainerId(),
+          ContainerHistoryData.newInstance(containerStart.getContainerId(),
+            containerStart.getAllocatedResource(),
+            containerStart.getAssignedNode(), containerStart.getPriority(),
+            containerStart.getStartTime(), Long.MAX_VALUE, null,
+            Integer.MAX_VALUE, null));
+    if (oldData != null) {
+      throw new IOException("The start information of container "
+          + containerStart.getContainerId() + " is already stored.");
+    }
+  }
+
+  @Override
+  public void containerFinished(ContainerFinishData containerFinish)
+      throws IOException {
+    ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+        getSubMap(containerFinish.getContainerId().getApplicationAttemptId());
+    ContainerHistoryData data = subMap.get(containerFinish.getContainerId());
+    if (data == null) {
+      throw new IOException("The finish information of container "
+          + containerFinish.getContainerId() + " is stored before"
+          + " the start information.");
+    }
+    // Make the assumption that ContainerState should not be null if
+    // the finish information is already recorded
+    if (data.getContainerState() != null) {
+      throw new IOException("The finish information of container "
+          + containerFinish.getContainerId() + " is already stored.");
+    }
+    data.setFinishTime(containerFinish.getFinishTime());
+    data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo());
+    data.setContainerExitStatus(containerFinish.getContainerExitStatus());
+    data.setContainerState(containerFinish.getContainerState());
+  }
+
+  private ConcurrentMap<ContainerId, ContainerHistoryData> getSubMap(
+      ApplicationAttemptId appAttemptId) {
+    containerData.putIfAbsent(appAttemptId,
+      new ConcurrentHashMap<ContainerId, ContainerHistoryData>());
+    return containerData.get(appAttemptId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
new file mode 100644
index 0000000..3660c10
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * Dummy implementation of {@link ApplicationHistoryStore}. If this
+ * implementation is used, no history data will be persisted.
+ * 
+ */
+@Unstable
+@Private
+public class NullApplicationHistoryStore extends AbstractService implements
+    ApplicationHistoryStore {
+
+  public NullApplicationHistoryStore() {
+    super(NullApplicationHistoryStore.class.getName());
+  }
+
+  @Override
+  public void applicationStarted(ApplicationStartData appStart)
+      throws IOException {
+  }
+
+  @Override
+  public void applicationFinished(ApplicationFinishData appFinish)
+      throws IOException {
+  }
+
+  @Override
+  public void applicationAttemptStarted(
+      ApplicationAttemptStartData appAttemptStart) throws IOException {
+  }
+
+  @Override
+  public void applicationAttemptFinished(
+      ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+  }
+
+  @Override
+  public void containerStarted(ContainerStartData containerStart)
+      throws IOException {
+  }
+
+  @Override
+  public void containerFinished(ContainerFinishData containerFinish)
+      throws IOException {
+  }
+
+  @Override
+  public ApplicationHistoryData getApplication(ApplicationId appId)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+      throws IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) throws IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    return null;
+  }
+
+  @Override
+  public ContainerHistoryData getContainer(ContainerId containerId)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    return Collections.emptyMap();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java
new file mode 100644
index 0000000..7974a5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.AppID;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.ApplicationInstance;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.HostMetricsGenerator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.MetricsGeneratorConfigurer;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.net.MetricsSender;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.net.RestMetricsSender;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.net.StdOutMetricsSender;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util.TimeStampProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.*;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.AppID.MASTER_APPS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.AppID.SLAVE_APPS;
+
+/**
+ *
+ */
+public class LoadRunner {
+  private final static Logger LOG = LoggerFactory.getLogger(LoadRunner.class);
+
+  private final ScheduledExecutorService timer;
+  private final ExecutorService workersPool;
+  private final Collection<Callable<String>> workers;
+  private final long startTime = new Date().getTime();
+  private final int collectIntervalMillis;
+  private final int sendIntervalMillis;
+
+  public LoadRunner(String hostName,
+                    int threadCount,
+                    String metricsHostName,
+                    int collectIntervalMillis,
+                    int sendIntervalMillis,
+                    boolean createMaster) {
+    this.collectIntervalMillis = collectIntervalMillis;
+    this.workersPool = Executors.newFixedThreadPool(threadCount);
+    this.timer = Executors.newScheduledThreadPool(1);
+    this.sendIntervalMillis = sendIntervalMillis;
+
+    workers = prepareWorkers(hostName, threadCount, metricsHostName, createMaster);
+  }
+
+  private Collection<Callable<String>> prepareWorkers(String hostName,
+                                                      int threadCount,
+                                                      String metricsHost,
+                                                      Boolean createMaster) {
+    Collection<Callable<String>> senderWorkers =
+      new ArrayList<Callable<String>>(threadCount);
+
+    int startIndex = 0;
+    if (createMaster) {
+      String simHost = hostName + ".0";
+      addMetricsWorkers(senderWorkers, simHost, metricsHost, MASTER_APPS);
+      startIndex++;
+    }
+
+    for (int i = startIndex; i < threadCount; i++) {
+      String simHost = hostName + "." + i;
+      addMetricsWorkers(senderWorkers, simHost, metricsHost, SLAVE_APPS);
+    }
+
+    return senderWorkers;
+  }
+
+  private void addMetricsWorkers(Collection<Callable<String>> senderWorkers,
+                                 String specificHostName,
+                                 String metricsHostName,
+                                 AppID[] apps) {
+    for (AppID app : apps) {
+      HostMetricsGenerator metricsGenerator =
+        createApplicationMetrics(specificHostName, app);
+      MetricsSender sender = new RestMetricsSender(metricsHostName);
+      senderWorkers.add(new MetricsSenderWorker(sender, metricsGenerator));
+    }
+  }
+
+  private HostMetricsGenerator createApplicationMetrics(String simHost, AppID host) {
+    ApplicationInstance appInstance = new ApplicationInstance(simHost, host, "");
+    TimeStampProvider timeStampProvider = new TimeStampProvider(startTime,
+      collectIntervalMillis, sendIntervalMillis);
+
+    return MetricsGeneratorConfigurer
+      .createMetricsForHost(appInstance, timeStampProvider);
+  }
+
+  public void start() {
+    timer.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          runOnce();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }, 0, sendIntervalMillis, TimeUnit.MILLISECONDS);
+  }
+
+  public void runOnce() throws InterruptedException {
+    List<Future<String>> futures = workersPool.invokeAll(workers,
+      sendIntervalMillis / 2,
+      TimeUnit.MILLISECONDS);
+    int done = 0;
+
+    // TODO: correctly count the failed tasks
+    for (Future<String> future : futures) {
+      done += future.isDone() ? 1 : 0;
+    }
+
+    LOG.info("Finished successfully " + done + " tasks ");
+  }
+
+  public void shutdown() {
+    timer.shutdownNow();
+    workersPool.shutdownNow();
+  }
+
+  public static void main(String[] args) {
+    LoadRunner runner =
+      new LoadRunner("local", 2, "metrics", 10000, 20000, false);
+
+    runner.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java
new file mode 100644
index 0000000..a0c1bd2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Sample Usage:
+ * <pre>
+ * $ java -cp "dependency/*":LoadSimulator-1.0-SNAPSHOT.jar \
+ * org.apache.ambari.metrics.MetricsLoadSimulator \
+ * -h "bartosz.laptop" -n 2 -m "162.216.148.45" -c 10000 -s 30000</pre>
+ */
+public class MetricsLoadSimulator {
+  private final static Logger LOG = LoggerFactory.getLogger(MetricsLoadSimulator
+    .class);
+
+  public static void main(String[] args) throws IOException, InterruptedException {
+    Map<String, String> mapArgs = parseArgs(args);
+
+    LoadRunner loadRunner = new LoadRunner(
+      mapArgs.get("hostName"),
+      Integer.valueOf(mapArgs.get("numberOfHosts")),
+      mapArgs.get("metricsHostName"),
+      Integer.valueOf(mapArgs.get("collectInterval")),
+      Integer.valueOf(mapArgs.get("sendInterval")),
+      Boolean.valueOf(mapArgs.get("master"))
+    );
+
+    loadRunner.start();
+  }
+
+  private static Map<String, String> parseArgs(String[] args) {
+    Map<String, String> mapProps = new HashMap<String, String>();
+    mapProps.put("hostName", "host");
+    mapProps.put("numberOfHosts", "20");
+    mapProps.put("trafficType", "burst");
+    mapProps.put("metricsHostName", "localhost");
+    mapProps.put("collectInterval", "10000");
+    mapProps.put("sendInterval", "60000");
+
+    if (args.length == 0) {
+      printUsage();
+      throw new RuntimeException("Unexpected argument, See usage message.");
+    } else {
+      for (int i = 0; i < args.length; i += 2) {
+        String arg = args[i];
+        if (arg.equals("-h")) {
+          mapProps.put("hostName", args[i + 1]);
+        } else if (arg.equals("-n")) {
+          mapProps.put("numberOfHosts", args[i + 1]);
+        } else if (arg.equals("-t")) {
+          mapProps.put("trafficType", args[i + 1]);
+        } else if (arg.equals("-m")) {
+          mapProps.put("metricsHostName", args[i + 1]);
+        } else if (arg.equals("-c")) {
+          mapProps.put("collectInterval", args[i + 1]);
+        } else if (arg.equals("-s")) {
+          mapProps.put("sendInterval", args[i + 1]);
+        } else if (arg.equals("-M")) {
+          mapProps.put("master", args[i + 1]);
+        } else if (arg.equals("-d")) {
+          // a dummy switch - it says that we agree with defaults
+        } else {
+          printUsage();
+          throw new RuntimeException("Unexpected argument, See usage message.");
+        }
+      }
+    }
+
+    LOG.info("Recognized options: baseHostName={} hosts#={} trafficMode={} " +
+        "metricsHostName={} collectIntervalMillis={} sendIntervalMillis={} " +
+        "simulateMaster={}",
+      mapProps.get("hostName"),
+      Integer.valueOf(mapProps.get("numberOfHosts")),
+      mapProps.get("trafficType"),
+      mapProps.get("metricsHostName"),
+      Integer.valueOf(mapProps.get("collectInterval")),
+      Integer.valueOf(mapProps.get("sendInterval")),
+      Boolean.valueOf(mapProps.get("master"))
+    );
+
+    return mapProps;
+  }
+
+  public static void printUsage() {
+    System.err.println("Usage: java MetricsLoadSimulator [OPTIONS]");
+    System.err.println("Options: ");
+    System.err.println("[-h hostName] [-n numberOfHosts] "
+      + "[-t trafficMode {burst, staggered}] [-m metricsHostName] "
+      + "[-c collectIntervalMillis {10 sec}] [-s sendIntervalMillis {60 sec}]"
+      + "[-M simulateMaster {true, false}] ");
+    System.err.println();
+    System.err.println("When you select a master, then one simulated host will play");
+    System.err.println("a role of a master, and the rest will be slaves. Otherwise");
+    System.err.println("all simulation threads (single thread is for single host)");
+    System.err.println("will be slave hosts");
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java
new file mode 100644
index 0000000..c027933
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator;
+
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.AppMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data.HostMetricsGenerator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.net.MetricsSender;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.net.RestMetricsSender;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util.Json;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ */
+public class MetricsSenderWorker implements Callable<String> {
+  private final static Logger LOG = LoggerFactory.getLogger(RestMetricsSender.class);
+
+  MetricsSender sender;
+  HostMetricsGenerator hmg;
+
+  public MetricsSenderWorker(MetricsSender sender, HostMetricsGenerator metricsGenerator) {
+    this.sender = sender;
+    hmg = metricsGenerator;
+  }
+
+  @Override
+  public String call() throws Exception {
+    AppMetrics hostMetrics = hmg.createMetrics();
+
+    try {
+      String request = new Json().serialize(hostMetrics);
+      String response = sender.pushMetrics(request);
+
+      return response;
+    } catch (IOException e) {
+      LOG.error("Error while pushing metrics: ", e);
+      throw e;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java
new file mode 100644
index 0000000..4f58dc5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data;
+
+public enum AppID {
+  HOST("HOST"),
+  NAMENODE("namenode"),
+  RESOURCEMANAGER("resourcemanager"),
+  DATANODE("datanode"),
+  NODEMANAGER("nodemanager"),
+  MASTER_HBASE("hbase"),
+  SLAVE_HBASE("hbase");
+
+  public static final AppID[] MASTER_APPS = {HOST, NAMENODE, RESOURCEMANAGER, MASTER_HBASE};
+  public static final AppID[] SLAVE_APPS = {HOST, DATANODE, NODEMANAGER, SLAVE_HBASE};
+
+  private String id;
+
+  private AppID(String id) {
+    this.id = id;
+  }
+
+  public String getId() {
+    return id;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java
new file mode 100644
index 0000000..d9cec2b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * AppMetrics is a class that helps to create properly initialized metrics for
+ * current app. It also holds the
+ * metrics and can be serialized to json.
+ */
+public class AppMetrics {
+
+  private final Collection<Metric> metrics = new ArrayList<Metric>();
+  private final transient ApplicationInstance applicationId;
+  private final transient long startTime;
+
+  public AppMetrics(ApplicationInstance applicationId, long startTime) {
+    this.applicationId = applicationId;
+    this.startTime = startTime;
+  }
+
+  public Metric createMetric(String metricName) {
+    return new Metric(applicationId, metricName, startTime);
+  }
+
+  public void addMetric(Metric metric) {
+    metrics.add(metric);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java
new file mode 100644
index 0000000..d99ecc9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.data;
+
+/**
+ * AppId is a helper class that encapsulates the common part of metrics message.
+ * It contains hostName, appId and instanceId. It is immutable,
+ * and it can not hold null values.
+ */
+public final class ApplicationInstance {
+
+  private final transient String hostName;
+  private final transient AppID appId;
+  private final transient String instanceId;
+
+  /**
+   * @param hostname
+   * @param appId
+   * @param instanceId
+   */
+  public ApplicationInstance(String hostname, AppID appId, String instanceId) {
+    if (hostname == null || appId == null || instanceId == null)
+      throw new IllegalArgumentException("ApplicationInstance can not be " +
+        "instantiated with null values");
+
+    this.hostName = hostname;
+    this.appId = appId;
+    this.instanceId = instanceId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public AppID getAppId() {
+    return appId;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java
new file mode 100644
index 0000000..61a6624
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.data;
+
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util.RandomMetricsProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util.TimeStampProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ */
+public class HostMetricsGenerator {
+
+  private Map<String, RandomMetricsProvider> metricDataProviders = new HashMap<String, RandomMetricsProvider>();
+  private final TimeStampProvider tsp;
+  private final ApplicationInstance id;
+
+  public HostMetricsGenerator(ApplicationInstance id,
+                              TimeStampProvider timeStamps,
+                              Map<String, RandomMetricsProvider> metricDataProviders) {
+    this.id = id;
+    this.tsp = timeStamps;
+    this.metricDataProviders = metricDataProviders;
+  }
+
+  public AppMetrics createMetrics() {
+    long[] timestamps = tsp.timestampsForNextInterval();
+    AppMetrics appMetrics = new AppMetrics(id, timestamps[0]);
+
+    for (Map.Entry<String, RandomMetricsProvider> entry : metricDataProviders.entrySet()) {
+      String metricName = entry.getKey();
+      RandomMetricsProvider metricData = entry.getValue();
+
+      Metric metric = appMetrics.createMetric(metricName);
+      for (long timestamp : timestamps) {
+        metric.putMetric(timestamp, String.valueOf(metricData.next()));
+      }
+      appMetrics.addMetric(metric);
+    }
+
+    return appMetrics;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java
new file mode 100644
index 0000000..f274263
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.data;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class Metric {
+
+  private String instanceid;
+  private String hostname;
+  private Map<String, String> metrics = new LinkedHashMap<String, String>();
+  private String starttime;
+  private String appid;
+  private String metricname;
+
+  // i don't like this ctor, but it has to be public for json deserialization
+  public Metric() {
+  }
+
+  public Metric(ApplicationInstance app, String metricName, long startTime) {
+    this.hostname = app.getHostName();
+    this.appid = app.getAppId().getId();
+    this.instanceid = app.getInstanceId();
+    this.metricname = metricName;
+    this.starttime = Long.toString(startTime);
+  }
+
+  public void putMetric(long timestamp, String value) {
+    metrics.put(Long.toString(timestamp), value);
+  }
+
+  public String getInstanceid() {
+    return instanceid;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public Map<String, String> getMetrics() {
+    return metrics;
+  }
+
+  public String getStarttime() {
+    return starttime;
+  }
+
+  public String getAppid() {
+    return appid;
+  }
+
+  public String getMetricname() {
+    return metricname;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java
new file mode 100644
index 0000000..b3401b2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.data;
+
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util.RandomMetricsProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util.TimeStampProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * MetricsGeneratorConfigurer is a factory that reads metrics definition from a file,
+ * and returns an Single HostMetricsGenerator. Check createMetricsForHost method
+ * for details.
+ */
+public class MetricsGeneratorConfigurer {
+
+  private final static Logger LOG = LoggerFactory.getLogger
+    (MetricsGeneratorConfigurer.class);
+
+  /**
+   * Creates HostMetricsGenerator configured with metric names loaded from file.
+   *
+   * @param id         ApplicationInstance descriptor, will be used to create
+   *                   HostMetricsGenerator, cannot be null
+   * @param timeStamps configured TimeStampProvider that can provide next
+   *                   timestamp, cannot be null
+   * @return HostMetricsGenerator with given ApplicationInstance id and configured
+   * mapping of
+   * metric names to data providers
+   */
+  public static HostMetricsGenerator createMetricsForHost(
+    ApplicationInstance id,
+    TimeStampProvider timeStamps) {
+    return new HostMetricsGenerator(id, timeStamps, readMetrics(id.getAppId()));
+  }
+
+  private static Map<String, RandomMetricsProvider> readMetrics(AppID type) {
+    InputStream input = null;
+    Map<String, RandomMetricsProvider> metrics =
+      new HashMap<String, RandomMetricsProvider>();
+    String fileName = "metrics_def/" + type.toString() + ".dat";
+
+    try {
+      LOG.info("Loading " + fileName);
+
+      input = MetricsGeneratorConfigurer.class.getClassLoader()
+        .getResourceAsStream(fileName);
+
+      BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+
+      String line;
+      while ((line = reader.readLine()) != null) {
+        metrics.put(line.trim(), new RandomMetricsProvider(100, 200));
+      }
+
+    } catch (IOException e) {
+      LOG.error("Cannot read file " + fileName + " for appID " + type.toString(), e);
+    } finally {
+      if (input != null) {
+        try {
+          input.close();
+        } catch (IOException ex) {
+          // intentionally left blank, here we cannot do anything
+        }
+      }
+    }
+
+    return metrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java
new file mode 100644
index 0000000..35c0fc3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net;
+
+/**
+ * MetricSender should provides a simple way of pushing metrics to some service.
+ */
+public interface MetricsSender {
+  /**
+   * Push metrics to the metric service (e.g. a metrics storage system).
+   *
+   * @param payload the payload to be sent to metrics service
+   * @return response message either acknowledgement or error
+   */
+  String pushMetrics(String payload);
+}


Mime
View raw message