hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From varunsax...@apache.org
Subject [44/50] [abbrv] hadoop git commit: YARN-6734. Ensure sub-application user is extracted & sent to timeline service (Rohith Sharma K S via Varun Saxena)
Date Thu, 03 Aug 2017 06:58:21 GMT
YARN-6734. Ensure sub-application user is extracted & sent to timeline service (Rohith Sharma K S via Varun Saxena)

(cherry picked from commit 0443928d771e3e21825b4f487e8c0865ea641970)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69d2c1e4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69d2c1e4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69d2c1e4

Branch: refs/heads/YARN-5355_branch2
Commit: 69d2c1e46eb414ddbb84b1981ad89d0179ee3a69
Parents: aeecf69
Author: Varun Saxena <varunsaxena@apache.org>
Authored: Fri Jul 28 22:02:19 2017 +0530
Committer: Varun Saxena <varunsaxena@apache.org>
Committed: Fri Jul 28 23:02:34 2017 +0530

----------------------------------------------------------------------
 ...stTimelineReaderWebServicesHBaseStorage.java |  26 ++-
 .../storage/DataGeneratorForTest.java           |  49 ++--
 .../storage/TestHBaseTimelineStorageApps.java   |  22 +-
 .../TestHBaseTimelineStorageEntities.java       | 128 +++++++++-
 .../flow/TestHBaseStorageFlowActivity.java      |  33 ++-
 .../storage/flow/TestHBaseStorageFlowRun.java   |  84 +++++--
 .../flow/TestHBaseStorageFlowRunCompaction.java |  14 +-
 .../storage/HBaseTimelineWriterImpl.java        | 232 +++++++++++--------
 .../SubApplicationRowKeyPrefix.java             |  20 --
 .../collector/TimelineCollector.java            |  11 +-
 .../storage/FileSystemTimelineWriterImpl.java   |  15 +-
 .../timelineservice/storage/TimelineWriter.java |  28 +--
 .../collector/TestTimelineCollector.java        |  12 +-
 .../TestFileSystemTimelineWriterImpl.java       |   8 +-
 14 files changed, 465 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index b36eb9d..4f6ba03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -334,16 +336,21 @@ public class TestTimelineReaderWebServicesHBaseStorage
 
     HBaseTimelineWriterImpl hbi = null;
     Configuration c1 = getHBaseTestingUtility().getConfiguration();
+    UserGroupInformation remoteUser =
+        UserGroupInformation.createRemoteUser(user);
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
-      hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te);
-      hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1);
-      hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
-      hbi.write(cluster, user, flow2,
-          flowVersion2, runid2, entity3.getId(), te3);
-      hbi.write(cluster, user, flow, flowVersion, runid,
-          "application_1111111111_1111", userEntities);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, entity.getId()), te, remoteUser);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, entity1.getId()), te1, remoteUser);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid1, entity4.getId()), te4, remoteUser);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow2, flowVersion2,
+          runid2, entity3.getId()), te3, remoteUser);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, "application_1111111111_1111"), userEntities, remoteUser);
       writeApplicationEntities(hbi, ts);
       hbi.flush();
     } finally {
@@ -375,8 +382,9 @@ public class TestTimelineReaderWebServicesHBaseStorage
 
         appEntity.addEvent(finished);
         te.addEntity(appEntity);
-        hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i,
-            appEntity.getId(), te);
+        hbi.write(new TimelineCollectorContext("cluster1", "user1", "flow1",
+            "CF7022C10F1354", i, appEntity.getId()), te,
+            UserGroupInformation.createRemoteUser("user1"));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java
index 926d8bb..cf6a854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 
 /**
  * Utility class that creates the schema and generates test data.
@@ -155,17 +157,20 @@ public final class DataGeneratorForTest {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(util.getConfiguration());
       hbi.start();
-      String cluster = "cluster1";
-      String user = "user1";
-      String flow = "some_flow_name";
-      String flowVersion = "AB7822C10F1111";
-      long runid = 1002345678919L;
-      String appName = "application_1111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      appName = "application_1111111111_3333";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
-      appName = "application_1111111111_4444";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te2);
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser("user1");
+      hbi.write(
+          new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
+              "AB7822C10F1111", 1002345678919L, "application_1111111111_2222"),
+          te, remoteUser);
+      hbi.write(
+          new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
+              "AB7822C10F1111", 1002345678919L, "application_1111111111_3333"),
+          te1, remoteUser);
+      hbi.write(
+          new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
+              "AB7822C10F1111", 1002345678919L, "application_1111111111_4444"),
+          te2, remoteUser);
       hbi.stop();
     } finally {
       if (hbi != null) {
@@ -433,15 +438,19 @@ public final class DataGeneratorForTest {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(util.getConfiguration());
       hbi.start();
-      String cluster = "cluster1";
-      String user = "user1";
-      String flow = "some_flow_name";
-      String flowVersion = "AB7822C10F1111";
-      long runid = 1002345678919L;
-      hbi.write(cluster, user, flow, flowVersion, runid, appName1, te);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName2, te);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2);
+
+      UserGroupInformation user =
+          UserGroupInformation.createRemoteUser("user1");
+      TimelineCollectorContext context =
+          new TimelineCollectorContext("cluster1", "user1", "some_flow_name",
+              "AB7822C10F1111", 1002345678919L, appName1);
+      hbi.write(context, te, user);
+      hbi.write(context, appTe1, user);
+
+      context = new TimelineCollectorContext("cluster1", "user1",
+          "some_flow_name", "AB7822C10F1111", 1002345678919L, appName2);
+      hbi.write(context, te, user);
+      hbi.write(context, appTe2, user);
       hbi.stop();
     } finally {
       if (hbi != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java
index d6b0370..65c7034 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -161,7 +163,8 @@ public class TestHBaseTimelineStorageApps {
       String flow = null;
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
-      hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appId), te, UserGroupInformation.createRemoteUser(user));
       hbi.stop();
 
       // retrieve the row
@@ -279,7 +282,8 @@ public class TestHBaseTimelineStorageApps {
       String flow = "s!ome_f\tlow  _n am!e";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
-      hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appId), te, UserGroupInformation.createRemoteUser(user));
 
       // Write entity again, this time without created time.
       entity = new ApplicationEntity();
@@ -291,7 +295,8 @@ public class TestHBaseTimelineStorageApps {
       entity.addInfo(infoMap1);
       te = new TimelineEntities();
       te.addEntity(entity);
-      hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appId), te, UserGroupInformation.createRemoteUser(user));
       hbi.stop();
 
       infoMap.putAll(infoMap1);
@@ -512,7 +517,9 @@ public class TestHBaseTimelineStorageApps {
       String flowVersion = "1111F01C2287BA";
       long runid = 1009876543218L;
       String appName = "application_123465899910_1001";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), entities,
+          UserGroupInformation.createRemoteUser(user));
       hbi.stop();
 
       // retrieve the row
@@ -627,14 +634,17 @@ public class TestHBaseTimelineStorageApps {
       hbi.init(c1);
       hbi.start();
       // Writing application entity.
+      TimelineCollectorContext context = new TimelineCollectorContext("c1",
+          "u1", "f1", "v1", 1002345678919L, appId);
+      UserGroupInformation user = UserGroupInformation.createRemoteUser("u1");
       try {
-        hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp);
+        hbi.write(context, teApp, user);
         Assert.fail("Expected an exception as metric values are non integral");
       } catch (IOException e) {}
 
       // Writing generic entity.
       try {
-        hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity);
+        hbi.write(context, teEntity, user);
         Assert.fail("Expected an exception as metric values are non integral");
       } catch (IOException e) {}
       hbi.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java
index 7ac5b36..422316b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -71,6 +73,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -196,12 +203,15 @@ public class TestHBaseTimelineStorageEntities {
       hbi.start();
       String cluster = "cluster_test_write_entity";
       String user = "user1";
+      String subAppUser = "subAppUser1";
       String flow = "some_flow_name";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
       String appName = ApplicationId.newInstance(System.currentTimeMillis() +
           9000000L, 1).toString();
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te,
+          UserGroupInformation.createRemoteUser(subAppUser));
       hbi.stop();
 
       // scan the table and see that entity exists
@@ -352,6 +362,11 @@ public class TestHBaseTimelineStorageEntities {
         assertEquals(metricValues.get(ts - 20000),
             metric.getValues().get(ts - 20000));
       }
+
+      // verify for sub application table entities.
+      verifySubApplicationTableEntities(cluster, user, flow, flowVersion, runid,
+          appName, subAppUser, c1, entity, id, type, infoMap, isRelatedTo,
+          relatesTo, conf, metricValues, metrics, cTime, m1);
     } finally {
       if (hbi != null) {
         hbi.stop();
@@ -360,6 +375,98 @@ public class TestHBaseTimelineStorageEntities {
     }
   }
 
+  private void verifySubApplicationTableEntities(String cluster, String user,
+      String flow, String flowVersion, Long runid, String appName,
+      String subAppUser, Configuration c1, TimelineEntity entity, String id,
+      String type, Map<String, Object> infoMap,
+      Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
+      Map<String, String> conf, Map<Long, Number> metricValues,
+      Set<TimelineMetric> metrics, Long cTime, TimelineMetric m1)
+      throws IOException {
+    Scan s = new Scan();
+    // read from SubApplicationTable
+    byte[] startRow = new SubApplicationRowKeyPrefix(cluster, subAppUser, null,
+        null, null, null).getRowKeyPrefix();
+    s.setStartRow(startRow);
+    s.setMaxVersions(Integer.MAX_VALUE);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    ResultScanner scanner =
+        new SubApplicationTable().getResultScanner(c1, conn, s);
+
+    int rowCount = 0;
+    int colCount = 0;
+    KeyConverter<String> stringKeyConverter = new StringKeyConverter();
+    for (Result result : scanner) {
+      if (result != null && !result.isEmpty()) {
+        rowCount++;
+        colCount += result.size();
+        byte[] row1 = result.getRow();
+        assertTrue(verifyRowKeyForSubApplication(row1, subAppUser, cluster,
+            user, entity));
+
+        // check info column family
+        String id1 = SubApplicationColumn.ID.readResult(result).toString();
+        assertEquals(id, id1);
+
+        String type1 = SubApplicationColumn.TYPE.readResult(result).toString();
+        assertEquals(type, type1);
+
+        Long cTime1 =
+            (Long) SubApplicationColumn.CREATED_TIME.readResult(result);
+        assertEquals(cTime1, cTime);
+
+        Map<String, Object> infoColumns = SubApplicationColumnPrefix.INFO
+            .readResults(result, new StringKeyConverter());
+        assertEquals(infoMap, infoColumns);
+
+        // Remember isRelatedTo is of type Map<String, Set<String>>
+        for (Map.Entry<String, Set<String>> isRelatedToEntry : isRelatedTo
+            .entrySet()) {
+          Object isRelatedToValue = SubApplicationColumnPrefix.IS_RELATED_TO
+              .readResult(result, isRelatedToEntry.getKey());
+          String compoundValue = isRelatedToValue.toString();
+          // id7?id9?id6
+          Set<String> isRelatedToValues =
+              new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+          assertEquals(isRelatedTo.get(isRelatedToEntry.getKey()).size(),
+              isRelatedToValues.size());
+          for (String v : isRelatedToEntry.getValue()) {
+            assertTrue(isRelatedToValues.contains(v));
+          }
+        }
+
+        // RelatesTo
+        for (Map.Entry<String, Set<String>> relatesToEntry : relatesTo
+            .entrySet()) {
+          String compoundValue = SubApplicationColumnPrefix.RELATES_TO
+              .readResult(result, relatesToEntry.getKey()).toString();
+          // id3?id4?id5
+          Set<String> relatesToValues =
+              new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+          assertEquals(relatesTo.get(relatesToEntry.getKey()).size(),
+              relatesToValues.size());
+          for (String v : relatesToEntry.getValue()) {
+            assertTrue(relatesToValues.contains(v));
+          }
+        }
+
+        // Configuration
+        Map<String, Object> configColumns = SubApplicationColumnPrefix.CONFIG
+            .readResults(result, stringKeyConverter);
+        assertEquals(conf, configColumns);
+
+        NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+            SubApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result,
+                stringKeyConverter);
+
+        NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+        matchMetrics(metricValues, metricMap);
+      }
+    }
+    assertEquals(1, rowCount);
+    assertEquals(16, colCount);
+  }
+
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
       String flow, Long runid, String appName, TimelineEntity te) {
 
@@ -407,7 +514,9 @@ public class TestHBaseTimelineStorageEntities {
       byte[] startRow =
           new EntityRowKeyPrefix(cluster, user, flow, runid, appName)
               .getRowKeyPrefix();
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), entities,
+          UserGroupInformation.createRemoteUser(user));
       hbi.stop();
       // scan the table and see that entity exists
       Scan s = new Scan();
@@ -510,7 +619,9 @@ public class TestHBaseTimelineStorageEntities {
       String flowVersion = "1111F01C2287BA";
       long runid = 1009876543218L;
       String appName = "application_123465899910_2001";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), entities,
+          UserGroupInformation.createRemoteUser(user));
       hbi.stop();
 
       // read the timeline entity using the reader this time
@@ -1758,4 +1869,15 @@ public class TestHBaseTimelineStorageEntities {
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();
   }
+
+  private boolean verifyRowKeyForSubApplication(byte[] rowKey, String suAppUser,
+      String cluster, String user, TimelineEntity te) {
+    SubApplicationRowKey key = SubApplicationRowKey.parseRowKey(rowKey);
+    assertEquals(suAppUser, key.getSubAppUserId());
+    assertEquals(cluster, key.getClusterId());
+    assertEquals(te.getType(), key.getEntityType());
+    assertEquals(te.getId(), key.getEntityId());
+    assertEquals(user, key.getUserId());
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 0923105..4bf221e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -38,12 +38,14 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -117,13 +119,18 @@ public class TestHBaseStorageFlowActivity {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // write another entity with the right min start time
       te = new TimelineEntities();
       te.addEntity(entityMinStartTime);
       appName = "application_100000000000_3333";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // writer another entity for max end time
       TimelineEntity entityMaxEndTime = TestFlowDataGenerator
@@ -131,7 +138,8 @@ public class TestHBaseStorageFlowActivity {
       te = new TimelineEntities();
       te.addEntity(entityMaxEndTime);
       appName = "application_100000000000_4444";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // writer another entity with greater start time
       TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
@@ -139,7 +147,8 @@ public class TestHBaseStorageFlowActivity {
       te = new TimelineEntities();
       te.addEntity(entityGreaterStartTime);
       appName = "application_1000000000000000_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // flush everything to hbase
       hbi.flush();
@@ -227,7 +236,8 @@ public class TestHBaseStorageFlowActivity {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
       String appName = "application_1111999999_1234";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, UserGroupInformation.createRemoteUser(user));
       hbi.flush();
     } finally {
       if (hbi != null) {
@@ -340,20 +350,27 @@ public class TestHBaseStorageFlowActivity {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
+
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
+
       String appName = "application_11888888888_1111";
-      hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion1,
+          runid1, appName), te, remoteUser);
 
       // write an application with to this flow but a different runid/ version
       te = new TimelineEntities();
       te.addEntity(entityApp1);
       appName = "application_11888888888_2222";
-      hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion2,
+          runid2, appName), te, remoteUser);
 
       // write an application with to this flow but a different runid/ version
       te = new TimelineEntities();
       te.addEntity(entityApp1);
       appName = "application_11888888888_3333";
-      hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion3,
+          runid3, appName), te, remoteUser);
 
       hbi.flush();
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index acfdc4d..1ad02e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -43,11 +43,13 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@@ -181,13 +183,18 @@ public class TestHBaseStorageFlowRun {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // write another entity with the right min start time
       te = new TimelineEntities();
       te.addEntity(entityMinStartTime);
       appName = "application_100000000000_3333";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // writer another entity for max end time
       TimelineEntity entityMaxEndTime = TestFlowDataGenerator
@@ -195,7 +202,8 @@ public class TestHBaseStorageFlowRun {
       te = new TimelineEntities();
       te.addEntity(entityMaxEndTime);
       appName = "application_100000000000_4444";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // writer another entity with greater start time
       TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
@@ -203,7 +211,8 @@ public class TestHBaseStorageFlowRun {
       te = new TimelineEntities();
       te.addEntity(entityGreaterStartTime);
       appName = "application_1000000000000000_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
 
       // flush everything to hbase
       hbi.flush();
@@ -287,15 +296,19 @@ public class TestHBaseStorageFlowRun {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
       String appName = "application_11111111111111_1111";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
       // write another application with same metric to this flow
       te = new TimelineEntities();
       TimelineEntity entityApp2 = TestFlowDataGenerator
           .getEntityMetricsApp2(System.currentTimeMillis());
       te.addEntity(entityApp2);
       appName = "application_11111111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
       hbi.flush();
     } finally {
       if (hbi != null) {
@@ -556,15 +569,22 @@ public class TestHBaseStorageFlowRun {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
+
       String appName = "application_11111111111111_1111";
-      hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          1002345678919L, appName), te,
+          remoteUser);
       // write another application with same metric to this flow
       te = new TimelineEntities();
       TimelineEntity entityApp2 = TestFlowDataGenerator
           .getEntityMetricsApp2(System.currentTimeMillis());
       te.addEntity(entityApp2);
       appName = "application_11111111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          1002345678918L, appName), te,
+          remoteUser);
       hbi.flush();
     } finally {
       if (hbi != null) {
@@ -643,15 +663,20 @@ public class TestHBaseStorageFlowRun {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
+
       String appName = "application_11111111111111_1111";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
       // write another application with same metric to this flow
       te = new TimelineEntities();
       TimelineEntity entityApp2 = TestFlowDataGenerator
           .getEntityMetricsApp2(System.currentTimeMillis());
       te.addEntity(entityApp2);
       appName = "application_11111111111111_2222";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+          runid, appName), te, remoteUser);
       hbi.flush();
     } finally {
       if (hbi != null) {
@@ -737,6 +762,8 @@ public class TestHBaseStorageFlowRun {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
 
       for (int i = start; i < count; i++) {
         String appName = "application_1060350000000_" + appIdSuffix;
@@ -746,7 +773,8 @@ public class TestHBaseStorageFlowRun {
         te1.addEntity(entityApp1);
         entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
         te1.addEntity(entityApp2);
-        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+            runid, appName), te1, remoteUser);
         Thread.sleep(1);
 
         appName = "application_1001199480000_7" + appIdSuffix;
@@ -758,7 +786,9 @@ public class TestHBaseStorageFlowRun {
         entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
         te1.addEntity(entityApp2);
 
-        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+            runid, appName), te1,
+            remoteUser);
         if (i % 1000 == 0) {
           hbi.flush();
           checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
@@ -826,16 +856,23 @@ public class TestHBaseStorageFlowRun {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
-          "application_11111111111111_1111", te);
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
+
+      hbi.write(
+          new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
+              1002345678919L, "application_11111111111111_1111"),
+          te, remoteUser);
       // write another application with same metric to this flow
       te = new TimelineEntities();
       TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
           System.currentTimeMillis());
       entityApp2.setCreatedTime(1425016502000L);
       te.addEntity(entityApp2);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
-          "application_11111111111111_2222", te);
+      hbi.write(
+          new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
+              1002345678918L, "application_11111111111111_2222"),
+          te, remoteUser);
       hbi.flush();
     } finally {
       if (hbi != null) {
@@ -911,15 +948,22 @@ public class TestHBaseStorageFlowRun {
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L,
-          "application_11111111111111_1111", te);
+      UserGroupInformation remoteUser =
+          UserGroupInformation.createRemoteUser(user);
+
+      hbi.write(
+          new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
+              1002345678919L, "application_11111111111111_1111"),
+          te, remoteUser);
       // write another application with same metric to this flow
       te = new TimelineEntities();
       TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(
           System.currentTimeMillis());
       te.addEntity(entityApp2);
-      hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L,
-          "application_11111111111111_2222", te);
+      hbi.write(
+          new TimelineCollectorContext(cluster, user, flow, "CF7022C10F1354",
+              1002345678918L, "application_11111111111111_2222"),
+          te, remoteUser);
       hbi.flush();
     } finally {
       if (hbi != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index fa9d029..0ef8260 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -48,8 +48,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
@@ -280,9 +282,12 @@ public class TestHBaseStorageFlowRunCompaction {
     Configuration c1 = util.getConfiguration();
     TimelineEntities te1 = null;
     TimelineEntity entityApp1 = null;
+    UserGroupInformation remoteUser =
+        UserGroupInformation.createRemoteUser(user);
     try {
       hbi = new HBaseTimelineWriterImpl();
       hbi.init(c1);
+
       // now insert count * ( 100 + 100) metrics
       // each call to getEntityMetricsApp1 brings back 100 values
       // of metric1 and 100 of metric2
@@ -292,14 +297,16 @@ public class TestHBaseStorageFlowRunCompaction {
         te1 = new TimelineEntities();
         entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1);
         te1.addEntity(entityApp1);
-        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+            runid, appName), te1, remoteUser);
 
         appName = "application_2048000000000_7" + appIdSuffix;
         insertTs++;
         te1 = new TimelineEntities();
         entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs);
         te1.addEntity(entityApp1);
-        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+            runid, appName), te1, remoteUser);
       }
     } finally {
       String appName = "application_10240000000000_" + appIdSuffix;
@@ -308,7 +315,8 @@ public class TestHBaseStorageFlowRunCompaction {
           insertTs + 1, c1);
       te1.addEntity(entityApp1);
       if (hbi != null) {
-        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+        hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion,
+            runid, appName), te1, remoteUser);
         hbi.flush();
         hbi.close();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index dfd63bf..b05bfd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import  org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -65,6 +67,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
 
 /**
  * This implements a hbase based backend for storing the timeline entity
@@ -85,6 +91,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   private TypedBufferedMutator<ApplicationTable> applicationTable;
   private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
   private TypedBufferedMutator<FlowRunTable> flowRunTable;
+  private TypedBufferedMutator<SubApplicationTable> subApplicationTable;
 
   /**
    * Used to convert strings key components to and from storage format.
@@ -97,6 +104,10 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
    */
   private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
 
+  private enum Tables {
+    APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE
+  };
+
   public HBaseTimelineWriterImpl() {
     super(HBaseTimelineWriterImpl.class.getName());
   }
@@ -116,17 +127,28 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
     flowActivityTable =
         new FlowActivityTable().getTableMutator(hbaseConf, conn);
+    subApplicationTable =
+        new SubApplicationTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
    * Stores the entire information in TimelineEntities to the timeline store.
    */
   @Override
-  public TimelineWriteResponse write(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntities data) throws IOException {
+  public TimelineWriteResponse write(TimelineCollectorContext context,
+      TimelineEntities data, UserGroupInformation callerUgi)
+      throws IOException {
 
     TimelineWriteResponse putStatus = new TimelineWriteResponse();
+
+    String clusterId = context.getClusterId();
+    String userId = context.getUserId();
+    String flowName = context.getFlowName();
+    String flowVersion = context.getFlowVersion();
+    long flowRunId = context.getFlowRunId();
+    String appId = context.getAppId();
+    String subApplicationUser = callerUgi.getShortUserName();
+
     // defensive coding to avoid NPE during row key construction
     if ((flowName == null) || (appId == null) || (clusterId == null)
         || (userId == null)) {
@@ -152,18 +174,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
             new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
                 appId);
         rowKey = applicationRowKey.getRowKey();
+        store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE);
       } else {
         EntityRowKey entityRowKey =
             new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
                 te.getType(), te.getIdPrefix(), te.getId());
         rowKey = entityRowKey.getRowKey();
+        store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
       }
 
-      storeInfo(rowKey, te, flowVersion, isApplication);
-      storeEvents(rowKey, te.getEvents(), isApplication);
-      storeConfig(rowKey, te.getConfigs(), isApplication);
-      storeMetrics(rowKey, te.getMetrics(), isApplication);
-      storeRelations(rowKey, te, isApplication);
+      if (!isApplication && !userId.equals(subApplicationUser)) {
+        SubApplicationRowKey subApplicationRowKey =
+            new SubApplicationRowKey(subApplicationUser, clusterId,
+                te.getType(), te.getIdPrefix(), te.getId(), userId);
+        rowKey = subApplicationRowKey.getRowKey();
+        store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE);
+      }
 
       if (isApplication) {
         TimelineEvent event =
@@ -304,72 +330,108 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     }
   }
 
-  private void storeRelations(byte[] rowKey, TimelineEntity te,
-      boolean isApplication) throws IOException {
-    if (isApplication) {
-      storeRelations(rowKey, te.getIsRelatedToEntities(),
-          ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
-      storeRelations(rowKey, te.getRelatesToEntities(),
-          ApplicationColumnPrefix.RELATES_TO, applicationTable);
-    } else {
-      storeRelations(rowKey, te.getIsRelatedToEntities(),
-          EntityColumnPrefix.IS_RELATED_TO, entityTable);
-      storeRelations(rowKey, te.getRelatesToEntities(),
-          EntityColumnPrefix.RELATES_TO, entityTable);
-    }
-  }
-
   /**
    * Stores the Relations from the {@linkplain TimelineEntity} object.
    */
   private <T> void storeRelations(byte[] rowKey,
-      Map<String, Set<String>> connectedEntities,
-      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
-          throws IOException {
-    for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
-        .entrySet()) {
-      // id3?id4?id5
-      String compoundValue =
-          Separator.VALUES.joinEncoded(connectedEntity.getValue());
-      columnPrefix.store(rowKey, table,
-          stringKeyConverter.encode(connectedEntity.getKey()), null,
-          compoundValue);
+      Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
+      TypedBufferedMutator<T> table) throws IOException {
+    if (connectedEntities != null) {
+      for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+          .entrySet()) {
+        // id3?id4?id5
+        String compoundValue =
+            Separator.VALUES.joinEncoded(connectedEntity.getValue());
+        columnPrefix.store(rowKey, table,
+            stringKeyConverter.encode(connectedEntity.getKey()), null,
+            compoundValue);
+      }
     }
   }
 
   /**
    * Stores information from the {@linkplain TimelineEntity} object.
    */
-  private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
-      boolean isApplication) throws IOException {
-
-    if (isApplication) {
+  private void store(byte[] rowKey, TimelineEntity te,
+      String flowVersion,
+      Tables table) throws IOException {
+    switch (table) {
+    case APPLICATION_TABLE:
       ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
       ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
           te.getCreatedTime());
       ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
           flowVersion);
-      Map<String, Object> info = te.getInfo();
-      if (info != null) {
-        for (Map.Entry<String, Object> entry : info.entrySet()) {
-          ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
-              stringKeyConverter.encode(entry.getKey()), null,
-              entry.getValue());
-        }
-      }
-    } else {
+      storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
+          applicationTable);
+      storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
+          applicationTable);
+      storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT,
+          applicationTable);
+      storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG,
+          applicationTable);
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          ApplicationColumnPrefix.RELATES_TO, applicationTable);
+      break;
+    case ENTITY_TABLE:
       EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
       EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
       EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
           te.getCreatedTime());
       EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
-      Map<String, Object> info = te.getInfo();
-      if (info != null) {
-        for (Map.Entry<String, Object> entry : info.entrySet()) {
-          EntityColumnPrefix.INFO.store(rowKey, entityTable,
-              stringKeyConverter.encode(entry.getKey()), null,
-              entry.getValue());
-        }
+      storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
+          entityTable);
+      storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
+          entityTable);
+      storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT,
+          entityTable);
+      storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG,
+          entityTable);
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          EntityColumnPrefix.IS_RELATED_TO, entityTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          EntityColumnPrefix.RELATES_TO, entityTable);
+      break;
+    case SUBAPPLICATION_TABLE:
+      SubApplicationColumn.ID.store(rowKey, subApplicationTable, null,
+          te.getId());
+      SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null,
+          te.getType());
+      SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null,
+          te.getCreatedTime());
+      SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null,
+          flowVersion);
+      storeInfo(rowKey, te.getInfo(), flowVersion,
+          SubApplicationColumnPrefix.INFO, subApplicationTable);
+      storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
+          subApplicationTable);
+      storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT,
+          subApplicationTable);
+      storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG,
+          subApplicationTable);
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          SubApplicationColumnPrefix.RELATES_TO, subApplicationTable);
+      break;
+    default:
+      LOG.info("Invalid table name provided.");
+      break;
+    }
+  }
+
+  /**
+   * stores the info information from {@linkplain TimelineEntity}.
+   */
+  private <T> void storeInfo(byte[] rowKey, Map<String, Object> info,
+      String flowVersion, ColumnPrefix<T> columnPrefix,
+      TypedBufferedMutator<T> table) throws IOException {
+    if (info != null) {
+      for (Map.Entry<String, Object> entry : info.entrySet()) {
+        columnPrefix.store(rowKey, table,
+            stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
       }
     }
   }
@@ -377,19 +439,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   /**
    * stores the config information from {@linkplain TimelineEntity}.
    */
-  private void storeConfig(byte[] rowKey, Map<String, String> config,
-      boolean isApplication) throws IOException {
-    if (config == null) {
-      return;
-    }
-    for (Map.Entry<String, String> entry : config.entrySet()) {
-      byte[] configKey = stringKeyConverter.encode(entry.getKey());
-      if (isApplication) {
-        ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
-            configKey, null, entry.getValue());
-      } else {
-        EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey,
-            null, entry.getValue());
+  private <T> void storeConfig(byte[] rowKey, Map<String, String> config,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+      throws IOException {
+    if (config != null) {
+      for (Map.Entry<String, String> entry : config.entrySet()) {
+        byte[] configKey = stringKeyConverter.encode(entry.getKey());
+        columnPrefix.store(rowKey, table, configKey, null, entry.getValue());
       }
     }
   }
@@ -398,8 +454,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
    * stores the {@linkplain TimelineMetric} information from the
    * {@linkplain TimelineEvent} object.
    */
-  private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
-      boolean isApplication) throws IOException {
+  private <T> void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+      throws IOException {
     if (metrics != null) {
       for (TimelineMetric metric : metrics) {
         byte[] metricColumnQualifier =
@@ -407,13 +464,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         Map<Long, Number> timeseries = metric.getValues();
         for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
           Long timestamp = timeseriesEntry.getKey();
-          if (isApplication) {
-            ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable,
-                metricColumnQualifier, timestamp, timeseriesEntry.getValue());
-          } else {
-            EntityColumnPrefix.METRIC.store(rowKey, entityTable,
-                metricColumnQualifier, timestamp, timeseriesEntry.getValue());
-          }
+          columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp,
+              timeseriesEntry.getValue());
         }
       }
     }
@@ -422,8 +474,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   /**
    * Stores the events from the {@linkplain TimelineEvent} object.
    */
-  private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
-      boolean isApplication) throws IOException {
+  private <T> void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+      throws IOException {
     if (events != null) {
       for (TimelineEvent event : events) {
         if (event != null) {
@@ -441,26 +494,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
               byte[] columnQualifierBytes =
                   new EventColumnName(eventId, eventTimestamp, null)
                       .getColumnQualifier();
-              if (isApplication) {
-                ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
-                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
-              } else {
-                EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
-              }
+              columnPrefix.store(rowKey, table, columnQualifierBytes, null,
+                  Separator.EMPTY_BYTES);
             } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
                 // eventId=infoKey
                 byte[] columnQualifierBytes =
                     new EventColumnName(eventId, eventTimestamp, info.getKey())
                         .getColumnQualifier();
-                if (isApplication) {
-                  ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
-                      columnQualifierBytes, null, info.getValue());
-                } else {
-                  EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                      columnQualifierBytes, null, info.getValue());
-                }
+                columnPrefix.store(rowKey, table, columnQualifierBytes, null,
+                    info.getValue());
               } // for info: eventInfo
             }
           }
@@ -500,6 +543,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     applicationTable.flush();
     flowRunTable.flush();
     flowActivityTable.flush();
+    subApplicationTable.flush();
   }
 
   /**
@@ -532,11 +576,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       // The close API performs flushing and releases any resources held
       flowActivityTable.close();
     }
+    if (subApplicationTable != null) {
+      subApplicationTable.close();
+    }
     if (conn != null) {
       LOG.info("closing the hbase Connection");
       conn.close();
     }
     super.serviceStop();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java
index e42c6cd..0c04959 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java
@@ -56,26 +56,6 @@ public class SubApplicationRowKeyPrefix extends SubApplicationRowKey
         userId);
   }
 
-  /**
-   * Creates a prefix which generates the following rowKeyPrefixes for the sub
-   * application table:
-   * {@code subAppUserId!clusterId!entityType!entityPrefix!entityId!userId}.
-   *
-   * subAppUserId is usually the doAsUser.
-   * userId is the yarn user that the AM runs as.
-   *
-   * @param clusterId
-   *          identifying the cluster
-   * @param subAppUserId
-   *          identifying the sub app user
-   * @param userId
-   *          identifying the user who runs the AM
-   */
-  public SubApplicationRowKeyPrefix(String clusterId, String subAppUserId,
-      String userId) {
-    this(subAppUserId, clusterId, null, null, null, userId);
-  }
-
   /*
    * (non-Javadoc)
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 5416b26..7cc4d3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -138,7 +138,7 @@ public abstract class TimelineCollector extends CompositeService {
     // flush the writer buffer concurrently and swallow any exception
     // caused by the timeline enitites that are being put here.
     synchronized (writer) {
-      response = writeTimelineEntities(entities);
+      response = writeTimelineEntities(entities, callerUgi);
       flushBufferedTimelineEntities();
     }
 
@@ -146,15 +146,14 @@ public abstract class TimelineCollector extends CompositeService {
   }
 
   private TimelineWriteResponse writeTimelineEntities(
-      TimelineEntities entities) throws IOException {
+      TimelineEntities entities, UserGroupInformation callerUgi)
+      throws IOException {
     // Update application metrics for aggregation
     updateAggregateStatus(entities, aggregationGroups,
         getEntityTypesSkipAggregation());
 
     final TimelineCollectorContext context = getTimelineEntityContext();
-    return writer.write(context.getClusterId(), context.getUserId(),
-        context.getFlowName(), context.getFlowVersion(),
-        context.getFlowRunId(), context.getAppId(), entities);
+    return writer.write(context, entities, callerUgi);
   }
 
   /**
@@ -186,7 +185,7 @@ public abstract class TimelineCollector extends CompositeService {
           callerUgi + ")");
     }
 
-    writeTimelineEntities(entities);
+    writeTimelineEntities(entities, callerUgi);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 1f527f2..ee41970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -28,12 +28,14 @@ import java.io.PrintWriter;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,10 +70,17 @@ public class FileSystemTimelineWriterImpl extends AbstractService
   }
 
   @Override
-  public TimelineWriteResponse write(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntities entities) throws IOException {
+  public TimelineWriteResponse write(TimelineCollectorContext context,
+      TimelineEntities entities, UserGroupInformation callerUgi)
+      throws IOException {
     TimelineWriteResponse response = new TimelineWriteResponse();
+    String clusterId = context.getClusterId();
+    String userId = context.getUserId();
+    String flowName = context.getFlowName();
+    String flowVersion = context.getFlowVersion();
+    long flowRunId = context.getFlowRunId();
+    String appId = context.getAppId();
+
     for (TimelineEntity entity : entities.getEntities()) {
       write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
           response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 663a18a..12bc1cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -21,10 +21,12 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 
 /**
  * This interface is for storing application timeline information.
@@ -34,25 +36,19 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 public interface TimelineWriter extends Service {
 
   /**
-   * Stores the entire information in {@link TimelineEntities} to the
-   * timeline store. Any errors occurring for individual write request objects
-   * will be reported in the response.
+   * Stores the entire information in {@link TimelineEntities} to the timeline
+   * store. Any errors occurring for individual write request objects will be
+   * reported in the response.
    *
-   * @param clusterId context cluster ID
-   * @param userId context user ID
-   * @param flowName context flow name
-   * @param flowVersion context flow version
-   * @param flowRunId run id for the flow.
-   * @param appId context app ID.
-   * @param data
-   *          a {@link TimelineEntities} object.
+   * @param context a {@link TimelineCollectorContext}
+   * @param data a {@link TimelineEntities} object.
+   * @param callerUgi {@link UserGroupInformation}.
    * @return a {@link TimelineWriteResponse} object.
-   * @throws IOException if there is any exception encountered while storing
-   *     or writing entities to the backend storage.
+   * @throws IOException if there is any exception encountered while storing or
+   *           writing entities to the back end storage.
    */
-  TimelineWriteResponse write(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntities data) throws IOException;
+  TimelineWriteResponse write(TimelineCollectorContext context,
+      TimelineEntities data, UserGroupInformation callerUgi) throws IOException;
 
   /**
    * Aggregates the entity information to the timeline store based on which

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
index 0f17553..ec45428 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -41,8 +41,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -156,9 +154,8 @@ public class TestTimelineCollector {
     collector.putEntities(
         entities, UserGroupInformation.createRemoteUser("test-user"));
 
-    verify(writer, times(1)).write(
-        anyString(), anyString(), anyString(), anyString(), anyLong(),
-        anyString(), any(TimelineEntities.class));
+    verify(writer, times(1)).write(any(TimelineCollectorContext.class),
+        any(TimelineEntities.class), any(UserGroupInformation.class));
     verify(writer, times(1)).flush();
   }
 
@@ -175,9 +172,8 @@ public class TestTimelineCollector {
     collector.putEntitiesAsync(
         entities, UserGroupInformation.createRemoteUser("test-user"));
 
-    verify(writer, times(1)).write(
-        anyString(), anyString(), anyString(), anyString(), anyLong(),
-        anyString(), any(TimelineEntities.class));
+    verify(writer, times(1)).write(any(TimelineCollectorContext.class),
+        any(TimelineEntities.class), any(UserGroupInformation.class));
     verify(writer, never()).flush();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69d2c1e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index 4f12c57..bb9f54f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -30,11 +30,13 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -89,8 +91,10 @@ public class TestFileSystemTimelineWriterImpl {
           outputRoot);
       fsi.init(conf);
       fsi.start();
-      fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L,
-          "app_id", te);
+      fsi.write(
+          new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
+              "flow_version", 12345678L, "app_id"),
+          te, UserGroupInformation.createRemoteUser("user_id"));
 
       String fileName = fsi.getOutputRoot() + File.separator + "entities" +
           File.separator + "cluster_id" + File.separator + "user_id" +


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message