hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From varunsax...@apache.org
Subject [22/50] [abbrv] hadoop git commit: YARN-5355. Backported YARN-2928 into our branch-2 feature branch.
Date Sun, 06 Nov 2016 16:31:39 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
new file mode 100644
index 0000000..3094088
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -0,0 +1,831 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables.
+ */
+public class TestHBaseStorageFlowRunCompaction {
+
+  private static HBaseTestingUtility util;
+
+  private static final String METRIC_1 = "MAP_SLOT_MILLIS";
+  private static final String METRIC_2 = "HDFS_BYTES_READ";
+
+  private final byte[] aRowKey = Bytes.toBytes("a");
+  private final byte[] aFamily = Bytes.toBytes("family");
+  private final byte[] aQualifier = Bytes.toBytes("qualifier");
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  /** Writes non numeric data into flow run table
+   * reads it back.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWriteNonNumericData() throws Exception {
+    String rowKey = "nonNumericRowKey";
+    String column = "nonNumericColumnName";
+    String value = "nonNumericValue";
+    byte[] rowKeyBytes = Bytes.toBytes(rowKey);
+    byte[] columnNameBytes = Bytes.toBytes(column);
+    byte[] valueBytes = Bytes.toBytes(value);
+    Put p = new Put(rowKeyBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    Configuration hbaseConf = util.getConfiguration();
+    TableName table = TableName.valueOf(hbaseConf.get(
+        FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+    Connection conn = null;
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    Table flowRunTable = conn.getTable(table);
+    flowRunTable.put(p);
+
+    Get g = new Get(rowKeyBytes);
+    Result r = flowRunTable.get(g);
+    assertNotNull(r);
+    assertTrue(r.size() >= 1);
+    Cell actualValue = r.getColumnLatestCell(
+        FlowRunColumnFamily.INFO.getBytes(), columnNameBytes);
+    assertNotNull(CellUtil.cloneValue(actualValue));
+    assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value);
+  }
+
+  @Test
+  public void testWriteScanBatchLimit() throws Exception {
+    String rowKey = "nonNumericRowKey";
+    String column = "nonNumericColumnName";
+    String value = "nonNumericValue";
+    String column2 = "nonNumericColumnName2";
+    String value2 = "nonNumericValue2";
+    String column3 = "nonNumericColumnName3";
+    String value3 = "nonNumericValue3";
+    String column4 = "nonNumericColumnName4";
+    String value4 = "nonNumericValue4";
+
+    byte[] rowKeyBytes = Bytes.toBytes(rowKey);
+    byte[] columnNameBytes = Bytes.toBytes(column);
+    byte[] valueBytes = Bytes.toBytes(value);
+    byte[] columnName2Bytes = Bytes.toBytes(column2);
+    byte[] value2Bytes = Bytes.toBytes(value2);
+    byte[] columnName3Bytes = Bytes.toBytes(column3);
+    byte[] value3Bytes = Bytes.toBytes(value3);
+    byte[] columnName4Bytes = Bytes.toBytes(column4);
+    byte[] value4Bytes = Bytes.toBytes(value4);
+
+    Put p = new Put(rowKeyBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+        value2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+        value3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+        value4Bytes);
+
+    Configuration hbaseConf = util.getConfiguration();
+    TableName table = TableName.valueOf(hbaseConf.get(
+        FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
+    Connection conn = null;
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    Table flowRunTable = conn.getTable(table);
+    flowRunTable.put(p);
+
+    String rowKey2 = "nonNumericRowKey2";
+    byte[] rowKey2Bytes = Bytes.toBytes(rowKey2);
+    p = new Put(rowKey2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+        value2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+        value3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+        value4Bytes);
+    flowRunTable.put(p);
+
+    String rowKey3 = "nonNumericRowKey3";
+    byte[] rowKey3Bytes = Bytes.toBytes(rowKey3);
+    p = new Put(rowKey3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
+        valueBytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes,
+        value2Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes,
+        value3Bytes);
+    p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes,
+        value4Bytes);
+    flowRunTable.put(p);
+
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    int batchLimit = 2;
+    s.setBatch(batchLimit);
+    ResultScanner scanner = flowRunTable.getScanner(s);
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertTrue(values.size() <= batchLimit);
+    }
+
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    batchLimit = 3;
+    s.setBatch(batchLimit);
+    scanner = flowRunTable.getScanner(s);
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertTrue(values.size() <= batchLimit);
+    }
+
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    batchLimit = 1000;
+    s.setBatch(batchLimit);
+    scanner = flowRunTable.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertTrue(result.rawCells().length <= batchLimit);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      assertTrue(values.size() <= batchLimit);
+      // we expect all back in one next call
+      assertEquals(4, values.size());
+      rowCount++;
+    }
+    // should get back 1 row with each invocation
+    // if scan batch is set sufficiently high
+    assertEquals(3, rowCount);
+
+    // test with a negative number
+    // should have same effect as setting it to a high number
+    s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    s.setStartRow(rowKeyBytes);
+    // set number of cells to fetch per scanner next invocation
+    batchLimit = -2992;
+    s.setBatch(batchLimit);
+    scanner = flowRunTable.getScanner(s);
+    rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      assertEquals(4, result.rawCells().length);
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
+      // we expect all back in one next call
+      assertEquals(4, values.size());
+      System.out.println(" values size " + values.size() +  " " + batchLimit);
+      rowCount++;
+    }
+    // should get back 1 row with each invocation
+    // if scan batch is set sufficiently high
+    assertEquals(3, rowCount);
+  }
+
+  @Test
+  public void testWriteFlowRunCompaction() throws Exception {
+    String cluster = "kompaction_cluster1";
+    String user = "kompaction_FlowRun__user1";
+    String flow = "kompaction_flowRun_flow_name";
+    String flowVersion = "AF1021C19F1351";
+    long runid = 1449526652000L;
+
+    int start = 10;
+    int count = 2000;
+    int appIdSuffix = 1;
+    HBaseTimelineWriterImpl hbi = null;
+    long insertTs = System.currentTimeMillis() - count;
+    Configuration c1 = util.getConfiguration();
+    TimelineEntities te1 = null;
+    TimelineEntity entityApp1 = null;
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      // now insert count * ( 100 + 100) metrics
+      // each call to getEntityMetricsApp1 brings back 100 values
+      // of metric1 and 100 of metric2
+      for (int i = start; i < start + count; i++) {
+        String appName = "application_10240000000000_" + appIdSuffix;
+        insertTs++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1);
+        te1.addEntity(entityApp1);
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+
+        appName = "application_2048000000000_7" + appIdSuffix;
+        insertTs++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs);
+        te1.addEntity(entityApp1);
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+      }
+    } finally {
+      String appName = "application_10240000000000_" + appIdSuffix;
+      te1 = new TimelineEntities();
+      entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete(
+          insertTs + 1, c1);
+      te1.addEntity(entityApp1);
+      if (hbi != null) {
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        hbi.flush();
+        hbi.close();
+      }
+    }
+
+    // check in flow run table
+    HRegionServer server = util.getRSForFirstRegionInTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    List<Region> regions = server.getOnlineRegions(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    assertTrue("Didn't find any regions for primary table!",
+        regions.size() > 0);
+    // flush and compact all the regions of the primary table
+    for (Region region : regions) {
+      region.flush(true);
+      region.compact(true);
+    }
+
+    // check flow run for one flow many apps
+    checkFlowRunTable(cluster, user, flow, runid, c1, 4);
+  }
+
+
+  private void checkFlowRunTable(String cluster, String user, String flow,
+      long runid, Configuration c1, int valueCount) throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow =
+        new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
+          .getBytes());
+      assertEquals(valueCount, values.size());
+
+      rowCount++;
+      // check metric1
+      byte[] q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1);
+      assertTrue(values.containsKey(q));
+      assertEquals(141, Bytes.toLong(values.get(q)));
+
+      // check metric2
+      q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2);
+      assertTrue(values.containsKey(q));
+      assertEquals(57, Bytes.toLong(values.get(q)));
+    }
+    assertEquals(1, rowCount);
+  }
+
+
+  private FlowScanner getFlowScannerForTestingCompaction() {
+    // create a FlowScanner object with the sole purpose of invoking a process
+    // summation;
+    CompactionRequest request = new CompactionRequest();
+    request.setIsMajor(true, true);
+    // okay to pass in nulls for the constructor arguments
+    // because all we want to do is invoke the process summation
+    FlowScanner fs = new FlowScanner(null, null,
+        (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
+            : FlowScannerOperation.MINOR_COMPACTION));
+    assertNotNull(fs);
+    return fs;
+  }
+
+  @Test
+  public void checkProcessSummationMoreCellsSumFinal2()
+      throws IOException {
+    long cellValue1 = 1236L;
+    long cellValue2 = 28L;
+    long cellValue3 = 1236L;
+    long cellValue4 = 1236L;
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cell1Ts = 1200120L;
+    long cell2Ts = TimestampGenerator.getSupplementedTimestamp(
+        System.currentTimeMillis(), "application_123746661110_11202");
+    long cell3Ts = 1277719L;
+    long cell4Ts = currentTimestamp - 10;
+
+    SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_1234588888_91188");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+    // create a cell with a VERY old timestamp and attribute SUM_FINAL
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
+    currentColumnCells.add(c1);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_12700000001_29102");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+    // create a cell with a recent timestamp and attribute SUM_FINAL
+    Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
+    currentColumnCells.add(c2);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_191780000000001_8195");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+    // create a cell with a VERY old timestamp but has attribute SUM
+    Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
+    currentColumnCells.add(c3);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_191780000000001_98104");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+    // create a cell with a VERY old timestamp but has attribute SUM
+    Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
+    currentColumnCells.add(c4);
+
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells,
+            new LongConverter(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back 4 cells
+    // one is the flow sum cell
+    // two are the cells with SUM attribute
+    // one cell with SUM_FINAL
+    assertEquals(4, cells.size());
+
+    for (int i = 0; i < cells.size(); i++) {
+      Cell returnedCell = cells.get(0);
+      assertNotNull(returnedCell);
+
+      long returnTs = returnedCell.getTimestamp();
+      long returnValue = Bytes.toLong(CellUtil
+          .cloneValue(returnedCell));
+      if (returnValue == cellValue2) {
+        assertTrue(returnTs == cell2Ts);
+      } else if (returnValue == cellValue3) {
+        assertTrue(returnTs == cell3Ts);
+      } else if (returnValue == cellValue4) {
+        assertTrue(returnTs == cell4Ts);
+      } else if (returnValue == cellValue1) {
+        assertTrue(returnTs != cell1Ts);
+        assertTrue(returnTs > cell1Ts);
+        assertTrue(returnTs >= currentTimestamp);
+      } else {
+        // raise a failure since we expect only these two values back
+        Assert.fail();
+      }
+    }
+  }
+
+  // tests with many cells
+  // of type SUM and SUM_FINAL
+  // all cells of SUM_FINAL will expire
+  @Test
+  public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    int count = 200000;
+
+    long cellValueFinal = 1000L;
+    long cellValueNotFinal = 28L;
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cellTsFinalStart = 10001120L;
+    long cellTsFinal = cellTsFinalStart;
+    long cellTsNotFinalStart = currentTimestamp - 5;
+    long cellTsNotFinal = cellTsNotFinalStart;
+
+    SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+    List<Tag> tags = null;
+    Tag t = null;
+    Cell c1 = null;
+
+    // insert SUM_FINAL cells
+    for (int i = 0; i < count; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          "application_123450000" + i + "01_19" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with a VERY old timestamp and attribute SUM_FINAL
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsFinal++;
+    }
+
+    // add SUM cells
+    for (int i = 0; i < count; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM.getTagType(),
+          "application_1987650000" + i + "83_911" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with attribute SUM
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsNotFinal++;
+    }
+
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells,
+            new LongConverter(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back count + 1 cells
+    // one is the flow sum cell
+    // others are the cells with SUM attribute
+    assertEquals(count + 1, cells.size());
+
+    for (int i = 0; i < cells.size(); i++) {
+      Cell returnedCell = cells.get(0);
+      assertNotNull(returnedCell);
+
+      long returnTs = returnedCell.getTimestamp();
+      long returnValue = Bytes.toLong(CellUtil
+          .cloneValue(returnedCell));
+      if (returnValue == (count * cellValueFinal)) {
+        assertTrue(returnTs > (cellTsFinalStart + count));
+        assertTrue(returnTs >= currentTimestamp);
+      } else if ((returnValue >= cellValueNotFinal)
+          && (returnValue <= cellValueNotFinal * count)) {
+        assertTrue(returnTs >= cellTsNotFinalStart);
+        assertTrue(returnTs <= cellTsNotFinalStart * count);
+      } else {
+        // raise a failure since we expect only these values back
+        Assert.fail();
+      }
+    }
+  }
+
+  // tests with many cells
+  // of type SUM and SUM_FINAL
+  // NOT cells of SUM_FINAL will expire
+  @Test
+  public void checkProcessSummationMoreCellsSumFinalVariedTags()
+      throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    int countFinal = 20100;
+    int countNotFinal = 1000;
+    int countFinalNotExpire = 7009;
+
+    long cellValueFinal = 1000L;
+    long cellValueNotFinal = 28L;
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cellTsFinalStart = 10001120L;
+    long cellTsFinal = cellTsFinalStart;
+
+    long cellTsFinalStartNotExpire =
+        TimestampGenerator.getSupplementedTimestamp(
+        System.currentTimeMillis(), "application_10266666661166_118821");
+    long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
+
+    long cellTsNotFinalStart = currentTimestamp - 5;
+    long cellTsNotFinal = cellTsNotFinalStart;
+
+    SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+    List<Tag> tags = null;
+    Tag t = null;
+    Cell c1 = null;
+
+    // insert SUM_FINAL cells which will expire
+    for (int i = 0; i < countFinal; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          "application_123450000" + i + "01_19" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with a VERY old timestamp and attribute SUM_FINAL
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsFinal++;
+    }
+
+    // insert SUM_FINAL cells which will NOT expire
+    for (int i = 0; i < countFinalNotExpire; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          "application_123450000" + i + "01_19" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with a VERY old timestamp and attribute SUM_FINAL
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsFinalNotExpire++;
+    }
+
+    // add SUM cells
+    for (int i = 0; i < countNotFinal; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM.getTagType(),
+          "application_1987650000" + i + "83_911" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with attribute SUM
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsNotFinal++;
+    }
+
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells,
+            new LongConverter(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back
+    // countNotFinal + countFinalNotExpire + 1 cells
+    // one is the flow sum cell
+    // count = the cells with SUM attribute
+    // count = the cells with SUM_FINAL attribute but not expired
+    assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size());
+
+    for (int i = 0; i < cells.size(); i++) {
+      Cell returnedCell = cells.get(0);
+      assertNotNull(returnedCell);
+
+      long returnTs = returnedCell.getTimestamp();
+      long returnValue = Bytes.toLong(CellUtil
+          .cloneValue(returnedCell));
+      if (returnValue == (countFinal * cellValueFinal)) {
+        assertTrue(returnTs > (cellTsFinalStart + countFinal));
+        assertTrue(returnTs >= currentTimestamp);
+      } else if (returnValue == cellValueNotFinal) {
+        assertTrue(returnTs >= cellTsNotFinalStart);
+        assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal);
+      } else if (returnValue == cellValueFinal){
+        assertTrue(returnTs >= cellTsFinalStartNotExpire);
+        assertTrue(returnTs <= cellTsFinalStartNotExpire + countFinalNotExpire);
+      } else {
+        // raise a failure since we expect only these values back
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testProcessSummationMoreCellsSumFinal() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cellValue1 = 1236L;
+    long cellValue2 = 28L;
+
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_1234588888_999888");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+    SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    // create a cell with a VERY old timestamp and attribute SUM_FINAL
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        120L, Bytes.toBytes(cellValue1), tagByteArray);
+    currentColumnCells.add(c1);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_100000000001_119101");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+
+    // create a cell with a VERY old timestamp but has attribute SUM
+    Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        130L, Bytes.toBytes(cellValue2), tagByteArray);
+    currentColumnCells.add(c2);
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        new LongConverter(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back two cells
+    // one is the flow sum cell
+    // another is the cell with SUM attribute
+    assertEquals(2, cells.size());
+
+    Cell returnedCell = cells.get(0);
+    assertNotNull(returnedCell);
+    long inputTs1 = c1.getTimestamp();
+    long inputTs2 = c2.getTimestamp();
+
+    long returnTs = returnedCell.getTimestamp();
+    long returnValue = Bytes.toLong(CellUtil
+        .cloneValue(returnedCell));
+    // the returned Ts will be far greater than input ts as well as the noted
+    // current timestamp
+    if (returnValue == cellValue2) {
+      assertTrue(returnTs == inputTs2);
+    } else if (returnValue == cellValue1) {
+      assertTrue(returnTs >= currentTimestamp);
+      assertTrue(returnTs != inputTs1);
+    } else {
+      // raise a failure since we expect only these two values back
+      Assert.fail();
+    }
+  }
+
+  @Test
+  public void testProcessSummationOneCellSumFinal() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_123458888888_999888");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+    SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    // create a cell with a VERY old timestamp
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        120L, Bytes.toBytes(1110L), tagByteArray);
+    currentColumnCells.add(c1);
+
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        new LongConverter(), currentTimestamp);
+    assertNotNull(cells);
+    // we should not get the same cell back
+    // but we get back the flow cell
+    assertEquals(1, cells.size());
+
+    Cell returnedCell = cells.get(0);
+    // it's NOT the same cell
+    assertNotEquals(c1, returnedCell);
+    long inputTs = c1.getTimestamp();
+    long returnTs = returnedCell.getTimestamp();
+    // the returned Ts will be far greater than input ts as well as the noted
+    // current timestamp
+    assertTrue(returnTs > inputTs);
+    assertTrue(returnTs >= currentTimestamp);
+  }
+
+  @Test
+  public void testProcessSummationOneCell() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+
+    // try for 1 cell with tag SUM
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_123458888888_999888");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+
+    SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
+    currentColumnCells.add(c1);
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        new LongConverter(), currentTimestamp);
+    assertNotNull(cells);
+    // we expect the same cell back
+    assertEquals(1, cells.size());
+    Cell c2 = cells.get(0);
+    assertEquals(c1, c2);
+    assertEquals(currentTimestamp, c2.getTimestamp());
+  }
+
+  @Test
+  public void testProcessSummationEmpty() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    long currentTimestamp = System.currentTimeMillis();
+
+    LongConverter longConverter = new LongConverter();
+
+    SortedSet<Cell> currentColumnCells = null;
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells, longConverter,
+            currentTimestamp);
+    assertNotNull(cells);
+    assertEquals(0, cells.size());
+
+    currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+    cells =
+        fs.processSummationMajorCompaction(currentColumnCells, longConverter,
+            currentTimestamp);
+    assertNotNull(cells);
+    assertEquals(0, cells.size());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..81a3f6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
new file mode 100644
index 0000000..64a79aa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -0,0 +1,203 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>hadoop-yarn-server</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>2.9.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+  <version>2.9.0-SNAPSHOT</version>
+  <name>Apache Hadoop YARN Timeline Service</name>
+
+  <properties>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>javax.xml.bind</groupId>
+      <artifactId>jaxb-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.phoenix</groupId>
+      <artifactId>phoenix-core</artifactId>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <additionnalDependencies>
+            <additionnalDependency>
+              <groupId>junit</groupId>
+              <artifactId>junit</artifactId>
+              <version>4.11</version>
+            </additionnalDependency>
+          </additionnalDependencies>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java
new file mode 100644
index 0000000..694b709
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice;
+
+/**
+ * Encapsulates timeline context information.
+ */
+public class TimelineContext {
+
+  private String clusterId;
+  private String userId;
+  private String flowName;
+  private Long flowRunId;
+  private String appId;
+
+  public TimelineContext() {
+    this(null, null, null, 0L, null);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((appId == null) ? 0 : appId.hashCode());
+    result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
+    result = prime * result + ((flowName == null) ? 0 : flowName.hashCode());
+    result = prime * result + ((flowRunId == null) ? 0 : flowRunId.hashCode());
+    result = prime * result + ((userId == null) ? 0 : userId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    TimelineContext other = (TimelineContext) obj;
+    if (appId == null) {
+      if (other.appId != null) {
+        return false;
+      }
+    } else if (!appId.equals(other.appId)) {
+      return false;
+    }
+    if (clusterId == null) {
+      if (other.clusterId != null) {
+        return false;
+      }
+    } else if (!clusterId.equals(other.clusterId)) {
+      return false;
+    }
+    if (flowName == null) {
+      if (other.flowName != null) {
+        return false;
+      }
+    } else if (!flowName.equals(other.flowName)) {
+      return false;
+    }
+    if (flowRunId == null) {
+      if (other.flowRunId != null) {
+        return false;
+      }
+    } else if (!flowRunId.equals(other.flowRunId)) {
+      return false;
+    }
+    if (userId == null) {
+      if (other.userId != null) {
+        return false;
+      }
+    } else if (!userId.equals(other.userId)) {
+      return false;
+    }
+    return true;
+  }
+
+  public TimelineContext(String clusterId, String userId, String flowName,
+      Long flowRunId, String appId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowName = flowName;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public void setClusterId(String cluster) {
+    this.clusterId = cluster;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public void setUserId(String user) {
+    this.userId = user;
+  }
+
+  public String getFlowName() {
+    return flowName;
+  }
+
+  public void setFlowName(String flow) {
+    this.flowName = flow;
+  }
+
+  public Long getFlowRunId() {
+    return flowRunId;
+  }
+
+  public void setFlowRunId(long runId) {
+    this.flowRunId = runId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String app) {
+    this.appId = app;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
new file mode 100644
index 0000000..d276269
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelTimelineCollector extends TimelineCollector {
+  private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+
+  private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
+  private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
+  private static Set<String> entityTypesSkipAggregation
+      = initializeSkipSet();
+
+  private final ApplicationId appId;
+  private final TimelineCollectorContext context;
+  private ScheduledThreadPoolExecutor appAggregationExecutor;
+
+  public AppLevelTimelineCollector(ApplicationId appId) {
+    super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
+    Preconditions.checkNotNull(appId, "AppId shouldn't be null");
+    this.appId = appId;
+    context = new TimelineCollectorContext();
+  }
+
+  private static Set<String> initializeSkipSet() {
+    Set<String> result = new HashSet<>();
+    result.add(TimelineEntityType.YARN_APPLICATION.toString());
+    result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
+    result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+    return result;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
+        YarnConfiguration.DEFAULT_RM_CLUSTER_ID));
+    // Set the default values, which will be updated with an RPC call to get the
+    // context info from NM.
+    // Current user usually is not the app user, but keep this field non-null
+    context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
+    context.setAppId(appId.toString());
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // Launch the aggregation thread
+    appAggregationExecutor = new ScheduledThreadPoolExecutor(
+        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
+        new ThreadFactoryBuilder()
+            .setNameFormat("TimelineCollector Aggregation thread #%d")
+            .build());
+    appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(),
+        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
+        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
+        TimeUnit.SECONDS);
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    appAggregationExecutor.shutdown();
+    if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+      LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
+      appAggregationExecutor.shutdownNow();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineCollectorContext getTimelineEntityContext() {
+    return context;
+  }
+
+  @Override
+  protected Set<String> getEntityTypesSkipAggregation() {
+    return entityTypesSkipAggregation;
+  }
+
+  private class AppLevelAggregator implements Runnable {
+
+    @Override
+    public void run() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App-level real-time aggregating");
+      }
+      if (!isReadyToAggregate()) {
+        LOG.warn("App-level collector is not ready, skip aggregation. ");
+        return;
+      }
+      try {
+        TimelineCollectorContext currContext = getTimelineEntityContext();
+        Map<String, AggregationStatusTable> aggregationGroups
+            = getAggregationGroups();
+        if (aggregationGroups == null
+            || aggregationGroups.isEmpty()) {
+          LOG.debug("App-level collector is empty, skip aggregation. ");
+          return;
+        }
+        TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
+            aggregationGroups, currContext.getAppId(),
+            TimelineEntityType.YARN_APPLICATION.toString());
+        TimelineEntities entities = new TimelineEntities();
+        entities.addEntity(resultEntity);
+        getWriter().write(currContext.getClusterId(), currContext.getUserId(),
+            currContext.getFlowName(), currContext.getFlowVersion(),
+            currContext.getFlowRunId(), currContext.getAppId(), entities);
+      } catch (Exception e) {
+        LOG.error("Error aggregating timeline metrics", e);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App-level real-time aggregation complete");
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
new file mode 100644
index 0000000..0323d7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class on the NodeManager side that manages adding and removing collectors and
+ * their lifecycle. Also instantiates the per-node collector webapp.
+ */
+@Private
+@Unstable
+public class NodeTimelineCollectorManager extends TimelineCollectorManager {
+  private static final Log LOG =
+      LogFactory.getLog(NodeTimelineCollectorManager.class);
+
+  // REST server for this collector manager.
+  private HttpServer2 timelineRestServer;
+
+  private String timelineRestServerBindAddress;
+
+  private volatile CollectorNodemanagerProtocol nmCollectorService;
+
+  static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
+
+  @VisibleForTesting
+  protected NodeTimelineCollectorManager() {
+    super(NodeTimelineCollectorManager.class.getName());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    startWebApp();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (timelineRestServer != null) {
+      timelineRestServer.stop();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
+    try {
+      // Get context info from NM
+      updateTimelineCollectorContext(appId, collector);
+      // Report to NM if a new collector is added.
+      reportNewCollectorToNM(appId);
+    } catch (YarnException | IOException e) {
+      // throw exception here as it cannot be used if failed communicate with NM
+      LOG.error("Failed to communicate with NM Collector Service for " + appId);
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  /**
+   * Launch the REST web server for this collector manager.
+   */
+  private void startWebApp() {
+    Configuration conf = getConfig();
+    String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
+    try {
+      HttpServer2.Builder builder = new HttpServer2.Builder()
+          .setName("timeline")
+          .setConf(conf)
+          .addEndpoint(URI.create(
+              (YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
+                  bindAddress));
+      timelineRestServer = builder.build();
+      // TODO: replace this by an authentication filter in future.
+      HashMap<String, String> options = new HashMap<>();
+      String username = conf.get(HADOOP_HTTP_STATIC_USER,
+          DEFAULT_HADOOP_HTTP_STATIC_USER);
+      options.put(HADOOP_HTTP_STATIC_USER, username);
+      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+          "static_user_filter_timeline",
+          StaticUserWebFilter.StaticUserFilter.class.getName(),
+          options, new String[] {"/*"});
+
+      timelineRestServer.addJerseyResourcePackage(
+          TimelineCollectorWebService.class.getPackage().getName() + ";"
+              + GenericExceptionHandler.class.getPackage().getName() + ";"
+              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+          "/*");
+      timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this);
+      timelineRestServer.start();
+    } catch (Exception e) {
+      String msg = "The per-node collector webapp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+    //TODO: We need to think of the case of multiple interfaces
+    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+        timelineRestServer.getConnectorAddress(0));
+    LOG.info("Instantiated the per-node collector webapp at " +
+        timelineRestServerBindAddress);
+  }
+
+  private void reportNewCollectorToNM(ApplicationId appId)
+      throws YarnException, IOException {
+    ReportNewCollectorInfoRequest request =
+        ReportNewCollectorInfoRequest.newInstance(appId,
+            this.timelineRestServerBindAddress);
+    LOG.info("Report a new collector for application: " + appId +
+        " to the NM Collector Service.");
+    getNMCollectorService().reportNewCollectorInfo(request);
+  }
+
+  private void updateTimelineCollectorContext(
+      ApplicationId appId, TimelineCollector collector)
+      throws YarnException, IOException {
+    GetTimelineCollectorContextRequest request =
+        GetTimelineCollectorContextRequest.newInstance(appId);
+    LOG.info("Get timeline collector context for " + appId);
+    GetTimelineCollectorContextResponse response =
+        getNMCollectorService().getTimelineCollectorContext(request);
+    String userId = response.getUserId();
+    if (userId != null && !userId.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the user in the context: " + userId);
+      }
+      collector.getTimelineEntityContext().setUserId(userId);
+    }
+    String flowName = response.getFlowName();
+    if (flowName != null && !flowName.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow name: " + flowName);
+      }
+      collector.getTimelineEntityContext().setFlowName(flowName);
+    }
+    String flowVersion = response.getFlowVersion();
+    if (flowVersion != null && !flowVersion.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow version: " + flowVersion);
+      }
+      collector.getTimelineEntityContext().setFlowVersion(flowVersion);
+    }
+    long flowRunId = response.getFlowRunId();
+    if (flowRunId != 0L) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow run id: " + flowRunId);
+      }
+      collector.getTimelineEntityContext().setFlowRunId(flowRunId);
+    }
+  }
+
+  @VisibleForTesting
+  protected CollectorNodemanagerProtocol getNMCollectorService() {
+    if (nmCollectorService == null) {
+      synchronized (this) {
+        if (nmCollectorService == null) {
+          Configuration conf = getConfig();
+          InetSocketAddress nmCollectorServiceAddress = conf.getSocketAddr(
+              YarnConfiguration.NM_BIND_HOST,
+              YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+              YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+              YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+          LOG.info("nmCollectorServiceAddress: " + nmCollectorServiceAddress);
+          final YarnRPC rpc = YarnRPC.create(conf);
+
+          // TODO Security settings.
+          nmCollectorService = (CollectorNodemanagerProtocol) rpc.getProxy(
+              CollectorNodemanagerProtocol.class,
+              nmCollectorServiceAddress, conf);
+        }
+      }
+    }
+    return nmCollectorService;
+  }
+
+  @VisibleForTesting
+  public String getRestServerBindAddress() {
+    return timelineRestServerBindAddress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
new file mode 100644
index 0000000..041e7c2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline collector manager. Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
+  private static final Log LOG =
+      LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class);
+  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  private final NodeTimelineCollectorManager collectorManager;
+  private long collectorLingerPeriod;
+  private ScheduledExecutorService scheduler;
+
+  public PerNodeTimelineCollectorsAuxService() {
+    this(new NodeTimelineCollectorManager());
+  }
+
+  @VisibleForTesting PerNodeTimelineCollectorsAuxService(
+      NodeTimelineCollectorManager collectorsManager) {
+    super("timeline_collector");
+    this.collectorManager = collectorsManager;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      throw new YarnException("Timeline service v2 is not enabled");
+    }
+    collectorLingerPeriod =
+        conf.getLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
+            YarnConfiguration.DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS);
+    scheduler = Executors.newSingleThreadScheduledExecutor();
+    collectorManager.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    collectorManager.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    scheduler.shutdown();
+    if (!scheduler.awaitTermination(collectorLingerPeriod,
+        TimeUnit.MILLISECONDS)) {
+      LOG.warn(
+          "Scheduler terminated before removing the application collectors");
+    }
+    collectorManager.stop();
+    super.serviceStop();
+  }
+
+  // these methods can be used as the basis for future service methods if the
+  // per-node collector runs separate from the node manager
+  /**
+   * Creates and adds an app level collector for the specified application id.
+   * The collector is also initialized and started. If the service already
+   * exists, no new service is created.
+   *
+   * @param appId Application Id to be added.
+   * @return whether it was added successfully
+   */
+  public boolean addApplication(ApplicationId appId) {
+    AppLevelTimelineCollector collector =
+        new AppLevelTimelineCollector(appId);
+    return (collectorManager.putIfAbsent(appId, collector)
+        == collector);
+  }
+
+  /**
+   * Removes the app level collector for the specified application id. The
+   * collector is also stopped as a result. If the collector does not exist, no
+   * change is made.
+   *
+   * @param appId Application Id to be removed.
+   * @return whether it was removed successfully
+   */
+  public boolean removeApplication(ApplicationId appId) {
+    return collectorManager.remove(appId);
+  }
+
+  /**
+   * Creates and adds an app level collector for the specified application id.
+   * The collector is also initialized and started. If the collector already
+   * exists, no new collector is created.
+   */
+  @Override
+  public void initializeContainer(ContainerInitializationContext context) {
+    // intercept the event of the AM container being created and initialize the
+    // app level collector service
+    if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      addApplication(appId);
+    }
+  }
+
+  /**
+   * Removes the app level collector for the specified application id. The
+   * collector is also stopped as a result. If the collector does not exist, no
+   * change is made.
+   */
+  @Override
+  public void stopContainer(ContainerTerminationContext context) {
+    // intercept the event of the AM container being stopped and remove the app
+    // level collector service
+    if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
+      final ApplicationId appId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
+      scheduler.schedule(new Runnable() {
+        public void run() {
+          removeApplication(appId);
+        }
+      }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  @VisibleForTesting
+  boolean hasApplication(ApplicationId appId) {
+    return collectorManager.containsTimelineCollector(appId);
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    // TODO currently it is not used; we can return a more meaningful data when
+    // we connect it with an AM
+    return ByteBuffer.allocate(0);
+  }
+
+  @VisibleForTesting
+  public static PerNodeTimelineCollectorsAuxService
+      launchServer(String[] args, NodeTimelineCollectorManager collectorManager,
+      Configuration conf) {
+    Thread
+      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    StringUtils.startupShutdownMessage(
+        PerNodeTimelineCollectorsAuxService.class, args, LOG);
+    PerNodeTimelineCollectorsAuxService auxService = null;
+    try {
+      auxService = collectorManager == null ?
+          new PerNodeTimelineCollectorsAuxService() :
+          new PerNodeTimelineCollectorsAuxService(collectorManager);
+      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
+          SHUTDOWN_HOOK_PRIORITY);
+      auxService.init(conf);
+      auxService.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting PerNodeTimelineCollectorServer", t);
+      ExitUtil.terminate(-1, "Error starting PerNodeTimelineCollectorServer");
+    }
+    return auxService;
+  }
+
+  private static class ShutdownHook implements Runnable {
+    private final PerNodeTimelineCollectorsAuxService auxService;
+
+    public ShutdownHook(PerNodeTimelineCollectorsAuxService auxService) {
+      this.auxService = auxService;
+    }
+
+    public void run() {
+      auxService.stop();
+    }
+  }
+
+  public static void main(String[] args) {
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    launchServer(args, null, conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
new file mode 100644
index 0000000..2fc3033
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can add their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public abstract class TimelineCollector extends CompositeService {
+
+  private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+  public static final String SEPARATOR = "_";
+
+  private TimelineWriter writer;
+  private ConcurrentMap<String, AggregationStatusTable> aggregationGroups
+      = new ConcurrentHashMap<>();
+  private static Set<String> entityTypesSkipAggregation
+      = new HashSet<>();
+
+  private volatile boolean readyToAggregate = false;
+
+  public TimelineCollector(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  protected void setWriter(TimelineWriter w) {
+    this.writer = w;
+  }
+
+  protected TimelineWriter getWriter() {
+    return writer;
+  }
+
+  protected Map<String, AggregationStatusTable> getAggregationGroups() {
+    return aggregationGroups;
+  }
+
+  protected void setReadyToAggregate() {
+    readyToAggregate = true;
+  }
+
+  protected boolean isReadyToAggregate() {
+    return readyToAggregate;
+  }
+
+  /**
+   * Method to decide the set of timeline entity types the collector should
+   * skip on aggregations. Subclasses may want to override this method to
+   * customize their own behaviors.
+   *
+   * @return A set of strings consists of all types the collector should skip.
+   */
+  protected Set<String> getEntityTypesSkipAggregation() {
+    return entityTypesSkipAggregation;
+  }
+
+  public abstract TimelineCollectorContext getTimelineEntityContext();
+
+
+  /**
+   * Handles entity writes. These writes are synchronous and are written to the
+   * backing storage without buffering/batching. If any entity already exists,
+   * it results in an update of the entity.
+   *
+   * This method should be reserved for selected critical entities and events.
+   * For normal voluminous writes one should use the async method
+   * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   * @return the response that contains the result of the post.
+   * @throws IOException if there is any exception encountered while putting
+   *     entities.
+   */
+  public TimelineWriteResponse putEntities(TimelineEntities entities,
+      UserGroupInformation callerUgi) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
+      LOG.debug("putEntities(entities=" + entities + ", callerUgi="
+          + callerUgi + ")");
+    }
+    TimelineCollectorContext context = getTimelineEntityContext();
+
+    // Update application metrics for aggregation
+    updateAggregateStatus(entities, aggregationGroups,
+        getEntityTypesSkipAggregation());
+
+    return writer.write(context.getClusterId(), context.getUserId(),
+        context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
+        context.getAppId(), entities);
+  }
+
+  /**
+   * Handles entity writes in an asynchronous manner. The method returns as soon
+   * as validation is done. No promises are made on how quickly it will be
+   * written to the backing storage or if it will always be written to the
+   * backing storage. Multiple writes to the same entities may be batched and
+   * appropriate values updated and result in fewer writes to the backing
+   * storage.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   */
+  public void putEntitiesAsync(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+  }
+
+  /**
+   * Aggregate all metrics in given timeline entities with no predefined states.
+   *
+   * @param entities Entities to aggregate
+   * @param resultEntityId Id of the result entity
+   * @param resultEntityType Type of the result entity
+   * @param needsGroupIdInResult Marks if we want the aggregation group id in
+   *                             each aggregated metrics.
+   * @return A timeline entity that contains all aggregated TimelineMetric.
+   */
+  public static TimelineEntity aggregateEntities(
+      TimelineEntities entities, String resultEntityId,
+      String resultEntityType, boolean needsGroupIdInResult) {
+    ConcurrentMap<String, AggregationStatusTable> aggregationGroups
+        = new ConcurrentHashMap<>();
+    updateAggregateStatus(entities, aggregationGroups, null);
+    if (needsGroupIdInResult) {
+      return aggregate(aggregationGroups, resultEntityId, resultEntityType);
+    } else {
+      return aggregateWithoutGroupId(
+          aggregationGroups, resultEntityId, resultEntityType);
+    }
+  }
+
+  /**
+   * Update the aggregation status table for a timeline collector.
+   *
+   * @param entities Entities to update
+   * @param aggregationGroups Aggregation status table
+   * @param typesToSkip Entity types that we can safely assume to skip updating
+   */
+  static void updateAggregateStatus(
+      TimelineEntities entities,
+      ConcurrentMap<String, AggregationStatusTable> aggregationGroups,
+      Set<String> typesToSkip) {
+    for (TimelineEntity e : entities.getEntities()) {
+      if ((typesToSkip != null && typesToSkip.contains(e.getType()))
+          || e.getMetrics().isEmpty()) {
+        continue;
+      }
+      AggregationStatusTable aggrTable = aggregationGroups.get(e.getType());
+      if (aggrTable == null) {
+        AggregationStatusTable table = new AggregationStatusTable();
+        aggrTable = aggregationGroups.putIfAbsent(e.getType(),
+            table);
+        if (aggrTable == null) {
+          aggrTable = table;
+        }
+      }
+      aggrTable.update(e);
+    }
+  }
+
+  /**
+   * Aggregate internal status and generate timeline entities for the
+   * aggregation results.
+   *
+   * @param aggregationGroups Aggregation status table
+   * @param resultEntityId Id of the result entity
+   * @param resultEntityType Type of the result entity
+   * @return A timeline entity that contains all aggregated TimelineMetric.
+   */
+  static TimelineEntity aggregate(
+      Map<String, AggregationStatusTable> aggregationGroups,
+      String resultEntityId, String resultEntityType) {
+    TimelineEntity result = new TimelineEntity();
+    result.setId(resultEntityId);
+    result.setType(resultEntityType);
+    for (Map.Entry<String, AggregationStatusTable> entry
+        : aggregationGroups.entrySet()) {
+      entry.getValue().aggregateAllTo(result, entry.getKey());
+    }
+    return result;
+  }
+
+  /**
+   * Aggregate internal status and generate timeline entities for the
+   * aggregation results. The result metrics will not have aggregation group
+   * information.
+   *
+   * @param aggregationGroups Aggregation status table
+   * @param resultEntityId Id of the result entity
+   * @param resultEntityType Type of the result entity
+   * @return A timeline entity that contains all aggregated TimelineMetric.
+   */
+  static TimelineEntity aggregateWithoutGroupId(
+      Map<String, AggregationStatusTable> aggregationGroups,
+      String resultEntityId, String resultEntityType) {
+    TimelineEntity result = new TimelineEntity();
+    result.setId(resultEntityId);
+    result.setType(resultEntityType);
+    for (Map.Entry<String, AggregationStatusTable> entry
+        : aggregationGroups.entrySet()) {
+      entry.getValue().aggregateAllTo(result, "");
+    }
+    return result;
+  }
+
+  // Note: In memory aggregation is performed in an eventually consistent
+  // fashion.
+  protected static class AggregationStatusTable {
+    // On aggregation, for each metric, aggregate all per-entity accumulated
+    // metrics. We only use the id and type for TimelineMetrics in the key set
+    // of this table.
+    private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>>
+        aggregateTable;
+
+    public AggregationStatusTable() {
+      aggregateTable = new ConcurrentHashMap<>();
+    }
+
+    public void update(TimelineEntity incoming) {
+      String entityId = incoming.getId();
+      for (TimelineMetric m : incoming.getMetrics()) {
+        // Skip if the metric does not need aggregation
+        if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
+          continue;
+        }
+        // Update aggregateTable
+        Map<String, TimelineMetric> aggrRow = aggregateTable.get(m);
+        if (aggrRow == null) {
+          Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>();
+          aggrRow = aggregateTable.putIfAbsent(m, tempRow);
+          if (aggrRow == null) {
+            aggrRow = tempRow;
+          }
+        }
+        aggrRow.put(entityId, m);
+      }
+    }
+
+    public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e,
+        String aggregationGroupId) {
+      if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
+        return e;
+      }
+      Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric);
+      if (aggrRow != null) {
+        TimelineMetric aggrMetric = new TimelineMetric();
+        if (aggregationGroupId.length() > 0) {
+          aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId);
+        } else {
+          aggrMetric.setId(metric.getId());
+        }
+        aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
+        Map<Object, Object> status = new HashMap<>();
+        for (TimelineMetric m : aggrRow.values()) {
+          TimelineMetric.aggregateTo(m, aggrMetric, status);
+          // getRealtimeAggregationOp returns an enum so we can directly
+          // compare with "!=".
+          if (m.getRealtimeAggregationOp()
+              != aggrMetric.getRealtimeAggregationOp()) {
+            aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
+          }
+        }
+        Set<TimelineMetric> metrics = e.getMetrics();
+        metrics.remove(aggrMetric);
+        metrics.add(aggrMetric);
+      }
+      return e;
+    }
+
+    public TimelineEntity aggregateAllTo(TimelineEntity e,
+        String aggregationGroupId) {
+      for (TimelineMetric m : aggregateTable.keySet()) {
+        aggregateTo(m, e, aggregationGroupId);
+      }
+      return e;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message