hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [48/50] [abbrv] hadoop git commit: YARN-4064. build is broken at TestHBaseTimelineWriterImpl.java (sjlee)
Date Tue, 25 Aug 2015 18:09:17 GMT
YARN-4064. build is broken at TestHBaseTimelineWriterImpl.java (sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e979f308
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e979f308
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e979f308

Branch: refs/heads/YARN-2928
Commit: e979f30831fc87f4c535df045527b466fe584ace
Parents: 89f8fd2
Author: Sangjin Lee <sjlee@apache.org>
Authored: Wed Aug 19 17:46:03 2015 -0700
Committer: Sangjin Lee <sjlee@apache.org>
Committed: Tue Aug 25 10:52:45 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../storage/TestHBaseTimelineStorage.java       | 770 +++++++++++++++++++
 .../storage/TestHBaseTimelineWriterImpl.java    | 770 -------------------
 3 files changed, 772 insertions(+), 770 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e979f308/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cd7f849..0041f7f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -139,6 +139,8 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3984. Adjusted the event column key schema and avoided missing empty
     event. (Vrushali C via zjshen)
 
+    YARN-4064. build is broken at TestHBaseTimelineWriterImpl.java (sjlee)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e979f308/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
new file mode 100644
index 0000000..2875e01
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -0,0 +1,770 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Various tests to test writing entities to HBase and reading them back from
+ * it.
+ *
+ * It uses a single HBase mini-cluster for all tests which is a little more
+ * realistic, and helps test correctness in the presence of other data.
+ *
+ * Each test uses a different cluster name to be able to handle its own data
+ * even if other records exist in the table. Use a different cluster name if
+ * you add a new test.
+ */
+public class TestHBaseTimelineStorage {
+
+  private static HBaseTestingUtility util;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    new EntityTable()
+        .createTable(util.getHBaseAdmin(), util.getConfiguration());
+    new AppToFlowTable()
+        .createTable(util.getHBaseAdmin(), util.getConfiguration());
+    new ApplicationTable()
+        .createTable(util.getHBaseAdmin(), util.getConfiguration());
+  }
+
+  @Test
+  public void testWriteApplicationToHBase() throws Exception {
+    TimelineEntities te = new TimelineEntities();
+    ApplicationEntity entity = new ApplicationEntity();
+    String id = "hello";
+    entity.setId(id);
+    long cTime = 1425016501000L;
+    long mTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    entity.setModifiedTime(mTime);
+
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap = new HashMap<String, Object>();
+    infoMap.put("infoMapKey1", "infoMapValue1");
+    infoMap.put("infoMapKey2", 10);
+    entity.addInfo(infoMap);
+
+    // add the isRelatedToEntity info
+    String key = "task";
+    String value = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet = new HashSet<String>();
+    isRelatedToSet.add(value);
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    isRelatedTo.put(key, isRelatedToSet);
+    entity.setIsRelatedToEntities(isRelatedTo);
+
+    // add the relatesTo info
+    key = "container";
+    value = "relates_to_entity_id_here";
+    Set<String> relatesToSet = new HashSet<String>();
+    relatesToSet.add(value);
+    value = "relates_to_entity_id_here_Second";
+    relatesToSet.add(value);
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    relatesTo.put(key, relatesToSet);
+    entity.setRelatesToEntities(relatesTo);
+
+    // add some config entries
+    Map<String, String> conf = new HashMap<String, String>();
+    conf.put("config_param1", "value1");
+    conf.put("config_param2", "value2");
+    entity.addConfigs(conf);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 120000, 100000000);
+    metricValues.put(ts - 100000, 200000000);
+    metricValues.put(ts - 80000, 300000000);
+    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    te.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      String cluster = "cluster_test_write_app";
+      String user = "user1";
+      String flow = "some_flow_name";
+      String flowVersion = "AB7822C10F1111";
+      long runid = 1002345678919L;
+      hbi.write(cluster, user, flow, flowVersion, runid, id, te);
+      hbi.stop();
+
+      // retrieve the row
+      byte[] rowKey =
+          ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
+      Get get = new Get(rowKey);
+      get.setMaxVersions(Integer.MAX_VALUE);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      Result result = new ApplicationTable().getResult(c1, conn, get);
+
+      assertTrue(result != null);
+      assertEquals(16, result.size());
+
+      // check the row key
+      byte[] row1 = result.getRow();
+      assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
+          id));
+
+      // check info column family
+      String id1 = ApplicationColumn.ID.readResult(result).toString();
+      assertEquals(id, id1);
+
+      Number val =
+          (Number) ApplicationColumn.CREATED_TIME.readResult(result);
+      long cTime1 = val.longValue();
+      assertEquals(cTime1, cTime);
+
+      val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
+      long mTime1 = val.longValue();
+      assertEquals(mTime1, mTime);
+
+      Map<String, Object> infoColumns =
+          ApplicationColumnPrefix.INFO.readResults(result);
+      assertEquals(infoMap, infoColumns);
+
+      // Remember isRelatedTo is of type Map<String, Set<String>>
+      for (String isRelatedToKey : isRelatedTo.keySet()) {
+        Object isRelatedToValue =
+            ApplicationColumnPrefix.IS_RELATED_TO.readResult(result,
+                isRelatedToKey);
+        String compoundValue = isRelatedToValue.toString();
+        // id7?id9?id6
+        Set<String> isRelatedToValues =
+            new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+        assertEquals(isRelatedTo.get(isRelatedToKey).size(),
+            isRelatedToValues.size());
+        for (String v : isRelatedTo.get(isRelatedToKey)) {
+          assertTrue(isRelatedToValues.contains(v));
+        }
+      }
+
+      // RelatesTo
+      for (String relatesToKey : relatesTo.keySet()) {
+        String compoundValue =
+            ApplicationColumnPrefix.RELATES_TO.readResult(result,
+                relatesToKey).toString();
+        // id3?id4?id5
+        Set<String> relatesToValues =
+            new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+        assertEquals(relatesTo.get(relatesToKey).size(),
+            relatesToValues.size());
+        for (String v : relatesTo.get(relatesToKey)) {
+          assertTrue(relatesToValues.contains(v));
+        }
+      }
+
+      // Configuration
+      Map<String, Object> configColumns =
+          ApplicationColumnPrefix.CONFIG.readResults(result);
+      assertEquals(conf, configColumns);
+
+      NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
+
+      NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+      assertEquals(metricValues, metricMap);
+
+      // read the timeline entity using the reader this time
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          id, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertEquals(1, es1.size());
+
+      // verify attributes
+      assertEquals(id, e1.getId());
+      assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
+          e1.getType());
+      assertEquals(cTime, e1.getCreatedTime());
+      assertEquals(mTime, e1.getModifiedTime());
+      Map<String, Object> infoMap2 = e1.getInfo();
+      assertEquals(infoMap, infoMap2);
+
+      Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
+      assertEquals(isRelatedTo, isRelatedTo2);
+
+      Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
+      assertEquals(relatesTo, relatesTo2);
+
+      Map<String, String> conf2 = e1.getConfigs();
+      assertEquals(conf, conf2);
+
+      Set<TimelineMetric> metrics2 = e1.getMetrics();
+      assertEquals(metrics, metrics2);
+      for (TimelineMetric metric2 : metrics2) {
+        Map<Long, Number> metricValues2 = metric2.getValues();
+        assertEquals(metricValues, metricValues2);
+      }
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+      if (hbr != null) {
+        hbr.stop();
+        hbr.close();
+      }
+    }
+  }
+
+  @Test
+  public void testWriteEntityToHBase() throws Exception {
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello";
+    String type = "world";
+    entity.setId(id);
+    entity.setType(type);
+    long cTime = 1425016501000L;
+    long mTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    entity.setModifiedTime(mTime);
+
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap = new HashMap<String, Object>();
+    infoMap.put("infoMapKey1", "infoMapValue1");
+    infoMap.put("infoMapKey2", 10);
+    entity.addInfo(infoMap);
+
+    // add the isRelatedToEntity info
+    String key = "task";
+    String value = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet = new HashSet<String>();
+    isRelatedToSet.add(value);
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    isRelatedTo.put(key, isRelatedToSet);
+    entity.setIsRelatedToEntities(isRelatedTo);
+
+    // add the relatesTo info
+    key = "container";
+    value = "relates_to_entity_id_here";
+    Set<String> relatesToSet = new HashSet<String>();
+    relatesToSet.add(value);
+    value = "relates_to_entity_id_here_Second";
+    relatesToSet.add(value);
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    relatesTo.put(key, relatesToSet);
+    entity.setRelatesToEntities(relatesTo);
+
+    // add some config entries
+    Map<String, String> conf = new HashMap<String, String>();
+    conf.put("config_param1", "value1");
+    conf.put("config_param2", "value2");
+    entity.addConfigs(conf);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 120000, 100000000);
+    metricValues.put(ts - 100000, 200000000);
+    metricValues.put(ts - 80000, 300000000);
+    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    te.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      String cluster = "cluster_test_write_entity";
+      String user = "user1";
+      String flow = "some_flow_name";
+      String flowVersion = "AB7822C10F1111";
+      long runid = 1002345678919L;
+      String appName = "some app name";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.stop();
+
+      // scan the table and see that entity exists
+      Scan s = new Scan();
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+      s.setStartRow(startRow);
+      s.setMaxVersions(Integer.MAX_VALUE);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
+      int rowCount = 0;
+      int colCount = 0;
+      for (Result result : scanner) {
+        if (result != null && !result.isEmpty()) {
+          rowCount++;
+          colCount += result.size();
+          byte[] row1 = result.getRow();
+          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
+              entity));
+
+          // check info column family
+          String id1 = EntityColumn.ID.readResult(result).toString();
+          assertEquals(id, id1);
+
+          String type1 = EntityColumn.TYPE.readResult(result).toString();
+          assertEquals(type, type1);
+
+          Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
+          long cTime1 = val.longValue();
+          assertEquals(cTime1, cTime);
+
+          val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
+          long mTime1 = val.longValue();
+          assertEquals(mTime1, mTime);
+
+          Map<String, Object> infoColumns =
+              EntityColumnPrefix.INFO.readResults(result);
+          assertEquals(infoMap, infoColumns);
+
+          // Remember isRelatedTo is of type Map<String, Set<String>>
+          for (String isRelatedToKey : isRelatedTo.keySet()) {
+            Object isRelatedToValue =
+                EntityColumnPrefix.IS_RELATED_TO.readResult(result,
+                    isRelatedToKey);
+            String compoundValue = isRelatedToValue.toString();
+            // id7?id9?id6
+            Set<String> isRelatedToValues =
+                new HashSet<String>(
+                    Separator.VALUES.splitEncoded(compoundValue));
+            assertEquals(isRelatedTo.get(isRelatedToKey).size(),
+                isRelatedToValues.size());
+            for (String v : isRelatedTo.get(isRelatedToKey)) {
+              assertTrue(isRelatedToValues.contains(v));
+            }
+          }
+
+          // RelatesTo
+          for (String relatesToKey : relatesTo.keySet()) {
+            String compoundValue =
+                EntityColumnPrefix.RELATES_TO.readResult(result, relatesToKey)
+                    .toString();
+            // id3?id4?id5
+            Set<String> relatesToValues =
+                new HashSet<String>(
+                    Separator.VALUES.splitEncoded(compoundValue));
+            assertEquals(relatesTo.get(relatesToKey).size(),
+                relatesToValues.size());
+            for (String v : relatesTo.get(relatesToKey)) {
+              assertTrue(relatesToValues.contains(v));
+            }
+          }
+
+          // Configuration
+          Map<String, Object> configColumns =
+              EntityColumnPrefix.CONFIG.readResults(result);
+          assertEquals(conf, configColumns);
+
+          NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+              EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
+
+          NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+          assertEquals(metricValues, metricMap);
+        }
+      }
+      assertEquals(1, rowCount);
+      assertEquals(17, colCount);
+
+      // read the timeline entity using the reader this time
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertEquals(1, es1.size());
+
+      // verify attributes
+      assertEquals(id, e1.getId());
+      assertEquals(type, e1.getType());
+      assertEquals(cTime, e1.getCreatedTime());
+      assertEquals(mTime, e1.getModifiedTime());
+      Map<String, Object> infoMap2 = e1.getInfo();
+      assertEquals(infoMap, infoMap2);
+
+      Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
+      assertEquals(isRelatedTo, isRelatedTo2);
+
+      Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
+      assertEquals(relatesTo, relatesTo2);
+
+      Map<String, String> conf2 = e1.getConfigs();
+      assertEquals(conf, conf2);
+
+      Set<TimelineMetric> metrics2 = e1.getMetrics();
+      assertEquals(metrics, metrics2);
+      for (TimelineMetric metric2 : metrics2) {
+        Map<Long, Number> metricValues2 = metric2.getValues();
+        assertEquals(metricValues, metricValues2);
+      }
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+      if (hbr != null) {
+        hbr.stop();
+        hbr.close();
+      }
+    }
+  }
+
+  private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
+      String flow, long runid, String appName, TimelineEntity te) {
+
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    assertTrue(rowKeyComponents.length == 7);
+    assertEquals(user, Bytes.toString(rowKeyComponents[0]));
+    assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
+    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+    assertEquals(TimelineWriterUtils.invert(runid),
+        Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+    assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
+    assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
+    return true;
+  }
+
+  private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
+      String user, String flow, long runid, String appName) {
+
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    assertTrue(rowKeyComponents.length == 5);
+    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
+    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
+    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+    assertEquals(TimelineWriterUtils.invert(runid),
+        Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+    return true;
+  }
+
+  @Test
+  public void testEvents() throws IOException {
+    TimelineEvent event = new TimelineEvent();
+    String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
+    event.setId(eventId);
+    long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+
+    final TimelineEntity entity = new ApplicationEntity();
+    entity.setId(ApplicationId.newInstance(0, 1).toString());
+    entity.addEvent(event);
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      String cluster = "cluster_test_events";
+      String user = "user2";
+      String flow = "other_flow_name";
+      String flowVersion = "1111F01C2287BA";
+      long runid = 1009876543218L;
+      String appName = "some app name";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.stop();
+
+      // retrieve the row
+      byte[] rowKey =
+          ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName);
+      Get get = new Get(rowKey);
+      get.setMaxVersions(Integer.MAX_VALUE);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      Result result = new ApplicationTable().getResult(c1, conn, get);
+
+      assertTrue(result != null);
+
+      // check the row key
+      byte[] row1 = result.getRow();
+      assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
+          appName));
+
+      Map<?, Object> eventsResult =
+          ApplicationColumnPrefix.EVENT.
+              readResultsHavingCompoundColumnQualifiers(result);
+      // there should be only one event
+      assertEquals(1, eventsResult.size());
+      for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+        // the qualifier is a compound key
+        // hence match individual values
+        byte[][] karr = (byte[][])e.getKey();
+        assertEquals(3, karr.length);
+        assertEquals(eventId, Bytes.toString(karr[0]));
+        assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
+        assertEquals(expKey, Bytes.toString(karr[2]));
+        Object value = e.getValue();
+        // there should be only one timestamp and value
+        assertEquals(expVal, value.toString());
+      }
+
+      // read the timeline entity using the reader this time
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
+      TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es2 = hbr.getEntities(user, cluster, null, null,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertNotNull(e2);
+      assertEquals(e1, e2);
+      assertEquals(1, es1.size());
+      assertEquals(1, es2.size());
+      assertEquals(es1, es2);
+
+      // check the events
+      NavigableSet<TimelineEvent> events = e1.getEvents();
+      // there should be only one event
+      assertEquals(1, events.size());
+      for (TimelineEvent e : events) {
+        assertEquals(eventId, e.getId());
+        assertEquals(expTs, e.getTimestamp());
+        Map<String,Object> info = e.getInfo();
+        assertEquals(1, info.size());
+        for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
+          assertEquals(expKey, infoEntry.getKey());
+          assertEquals(expVal, infoEntry.getValue());
+        }
+      }
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+      if (hbr != null) {
+        hbr.stop();
+        hbr.close();
+      }
+    }
+  }
+
+  @Test
+  public void testEventsWithEmptyInfo() throws IOException {
+    TimelineEvent event = new TimelineEvent();
+    String eventId = "foo_event_id";
+    event.setId(eventId);
+    long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+
+    final TimelineEntity entity = new TimelineEntity();
+    entity.setId("attempt_1329348432655_0001_m_000008_18");
+    entity.setType("FOO_ATTEMPT");
+    entity.addEvent(event);
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      String cluster = "cluster_test_empty_eventkey";
+      String user = "user_emptyeventkey";
+      String flow = "other_flow_name";
+      String flowVersion = "1111F01C2287BA";
+      long runid = 1009876543218L;
+      String appName = "some app name";
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.stop();
+      // scan the table and see that entity exists
+      Scan s = new Scan();
+      s.setStartRow(startRow);
+      s.addFamily(EntityColumnFamily.INFO.getBytes());
+      Connection conn = ConnectionFactory.createConnection(c1);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
+      int rowCount = 0;
+      for (Result result : scanner) {
+        if (result != null && !result.isEmpty()) {
+          rowCount++;
+
+          // check the row key
+          byte[] row1 = result.getRow();
+          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
+              entity));
+
+          Map<?, Object> eventsResult =
+              EntityColumnPrefix.EVENT.
+                  readResultsHavingCompoundColumnQualifiers(result);
+          // there should be only one event
+          assertEquals(1, eventsResult.size());
+          for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+            // the qualifier is a compound key
+            // hence match individual values
+            byte[][] karr = (byte[][])e.getKey();
+            assertEquals(3, karr.length);
+            assertEquals(eventId, Bytes.toString(karr[0]));
+            assertEquals(TimelineWriterUtils.invert(expTs),
+                Bytes.toLong(karr[1]));
+            // key must be empty
+            assertEquals(0, karr[2].length);
+            Object value = e.getValue();
+            // value should be empty
+            assertEquals("", value.toString());
+          }
+        }
+      }
+      assertEquals(1, rowCount);
+
+      // read the timeline entity using the reader this time
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertEquals(1, es1.size());
+
+      // check the events
+      NavigableSet<TimelineEvent> events = e1.getEvents();
+      // there should be only one event
+      assertEquals(1, events.size());
+      for (TimelineEvent e : events) {
+        assertEquals(eventId, e.getId());
+        assertEquals(expTs, e.getTimestamp());
+        Map<String,Object> info = e.getInfo();
+        assertTrue(info == null || info.isEmpty());
+      }
+    } finally {
+      hbi.stop();
+      hbi.close();
+      hbr.stop();;
+      hbr.close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e979f308/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
deleted file mode 100644
index 2875e01..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
+++ /dev/null
@@ -1,770 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Various tests to test writing entities to HBase and reading them back from
- * it.
- *
- * It uses a single HBase mini-cluster for all tests which is a little more
- * realistic, and helps test correctness in the presence of other data.
- *
- * Each test uses a different cluster name to be able to handle its own data
- * even if other records exist in the table. Use a different cluster name if
- * you add a new test.
- */
-public class TestHBaseTimelineStorage {
-
-  private static HBaseTestingUtility util;
-
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    util = new HBaseTestingUtility();
-    util.startMiniCluster();
-    createSchema();
-  }
-
-  private static void createSchema() throws IOException {
-    new EntityTable()
-        .createTable(util.getHBaseAdmin(), util.getConfiguration());
-    new AppToFlowTable()
-        .createTable(util.getHBaseAdmin(), util.getConfiguration());
-    new ApplicationTable()
-        .createTable(util.getHBaseAdmin(), util.getConfiguration());
-  }
-
-  @Test
-  public void testWriteApplicationToHBase() throws Exception {
-    TimelineEntities te = new TimelineEntities();
-    ApplicationEntity entity = new ApplicationEntity();
-    String id = "hello";
-    entity.setId(id);
-    long cTime = 1425016501000L;
-    long mTime = 1425026901000L;
-    entity.setCreatedTime(cTime);
-    entity.setModifiedTime(mTime);
-
-    // add the info map in Timeline Entity
-    Map<String, Object> infoMap = new HashMap<String, Object>();
-    infoMap.put("infoMapKey1", "infoMapValue1");
-    infoMap.put("infoMapKey2", 10);
-    entity.addInfo(infoMap);
-
-    // add the isRelatedToEntity info
-    String key = "task";
-    String value = "is_related_to_entity_id_here";
-    Set<String> isRelatedToSet = new HashSet<String>();
-    isRelatedToSet.add(value);
-    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
-    isRelatedTo.put(key, isRelatedToSet);
-    entity.setIsRelatedToEntities(isRelatedTo);
-
-    // add the relatesTo info
-    key = "container";
-    value = "relates_to_entity_id_here";
-    Set<String> relatesToSet = new HashSet<String>();
-    relatesToSet.add(value);
-    value = "relates_to_entity_id_here_Second";
-    relatesToSet.add(value);
-    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
-    relatesTo.put(key, relatesToSet);
-    entity.setRelatesToEntities(relatesTo);
-
-    // add some config entries
-    Map<String, String> conf = new HashMap<String, String>();
-    conf.put("config_param1", "value1");
-    conf.put("config_param2", "value2");
-    entity.addConfigs(conf);
-
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId("MAP_SLOT_MILLIS");
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = System.currentTimeMillis();
-    metricValues.put(ts - 120000, 100000000);
-    metricValues.put(ts - 100000, 200000000);
-    metricValues.put(ts - 80000, 300000000);
-    metricValues.put(ts - 60000, 400000000);
-    metricValues.put(ts - 40000, 50000000000L);
-    metricValues.put(ts - 20000, 60000000000L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-    entity.addMetrics(metrics);
-
-    te.addEntity(entity);
-
-    HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      Configuration c1 = util.getConfiguration();
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      String cluster = "cluster_test_write_app";
-      String user = "user1";
-      String flow = "some_flow_name";
-      String flowVersion = "AB7822C10F1111";
-      long runid = 1002345678919L;
-      hbi.write(cluster, user, flow, flowVersion, runid, id, te);
-      hbi.stop();
-
-      // retrieve the row
-      byte[] rowKey =
-          ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
-      Get get = new Get(rowKey);
-      get.setMaxVersions(Integer.MAX_VALUE);
-      Connection conn = ConnectionFactory.createConnection(c1);
-      Result result = new ApplicationTable().getResult(c1, conn, get);
-
-      assertTrue(result != null);
-      assertEquals(16, result.size());
-
-      // check the row key
-      byte[] row1 = result.getRow();
-      assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
-          id));
-
-      // check info column family
-      String id1 = ApplicationColumn.ID.readResult(result).toString();
-      assertEquals(id, id1);
-
-      Number val =
-          (Number) ApplicationColumn.CREATED_TIME.readResult(result);
-      long cTime1 = val.longValue();
-      assertEquals(cTime1, cTime);
-
-      val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
-      long mTime1 = val.longValue();
-      assertEquals(mTime1, mTime);
-
-      Map<String, Object> infoColumns =
-          ApplicationColumnPrefix.INFO.readResults(result);
-      assertEquals(infoMap, infoColumns);
-
-      // Remember isRelatedTo is of type Map<String, Set<String>>
-      for (String isRelatedToKey : isRelatedTo.keySet()) {
-        Object isRelatedToValue =
-            ApplicationColumnPrefix.IS_RELATED_TO.readResult(result,
-                isRelatedToKey);
-        String compoundValue = isRelatedToValue.toString();
-        // id7?id9?id6
-        Set<String> isRelatedToValues =
-            new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
-        assertEquals(isRelatedTo.get(isRelatedToKey).size(),
-            isRelatedToValues.size());
-        for (String v : isRelatedTo.get(isRelatedToKey)) {
-          assertTrue(isRelatedToValues.contains(v));
-        }
-      }
-
-      // RelatesTo
-      for (String relatesToKey : relatesTo.keySet()) {
-        String compoundValue =
-            ApplicationColumnPrefix.RELATES_TO.readResult(result,
-                relatesToKey).toString();
-        // id3?id4?id5
-        Set<String> relatesToValues =
-            new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
-        assertEquals(relatesTo.get(relatesToKey).size(),
-            relatesToValues.size());
-        for (String v : relatesTo.get(relatesToKey)) {
-          assertTrue(relatesToValues.contains(v));
-        }
-      }
-
-      // Configuration
-      Map<String, Object> configColumns =
-          ApplicationColumnPrefix.CONFIG.readResults(result);
-      assertEquals(conf, configColumns);
-
-      NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
-
-      NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
-      assertEquals(metricValues, metricMap);
-
-      // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
-          entity.getType(), entity.getId(),
-          EnumSet.of(TimelineReader.Field.ALL));
-      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
-          id, entity.getType(), null, null, null, null, null, null, null,
-          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
-      assertNotNull(e1);
-      assertEquals(1, es1.size());
-
-      // verify attributes
-      assertEquals(id, e1.getId());
-      assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
-          e1.getType());
-      assertEquals(cTime, e1.getCreatedTime());
-      assertEquals(mTime, e1.getModifiedTime());
-      Map<String, Object> infoMap2 = e1.getInfo();
-      assertEquals(infoMap, infoMap2);
-
-      Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
-      assertEquals(isRelatedTo, isRelatedTo2);
-
-      Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
-      assertEquals(relatesTo, relatesTo2);
-
-      Map<String, String> conf2 = e1.getConfigs();
-      assertEquals(conf, conf2);
-
-      Set<TimelineMetric> metrics2 = e1.getMetrics();
-      assertEquals(metrics, metrics2);
-      for (TimelineMetric metric2 : metrics2) {
-        Map<Long, Number> metricValues2 = metric2.getValues();
-        assertEquals(metricValues, metricValues2);
-      }
-    } finally {
-      if (hbi != null) {
-        hbi.stop();
-        hbi.close();
-      }
-      if (hbr != null) {
-        hbr.stop();
-        hbr.close();
-      }
-    }
-  }
-
-  @Test
-  public void testWriteEntityToHBase() throws Exception {
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entity = new TimelineEntity();
-    String id = "hello";
-    String type = "world";
-    entity.setId(id);
-    entity.setType(type);
-    long cTime = 1425016501000L;
-    long mTime = 1425026901000L;
-    entity.setCreatedTime(cTime);
-    entity.setModifiedTime(mTime);
-
-    // add the info map in Timeline Entity
-    Map<String, Object> infoMap = new HashMap<String, Object>();
-    infoMap.put("infoMapKey1", "infoMapValue1");
-    infoMap.put("infoMapKey2", 10);
-    entity.addInfo(infoMap);
-
-    // add the isRelatedToEntity info
-    String key = "task";
-    String value = "is_related_to_entity_id_here";
-    Set<String> isRelatedToSet = new HashSet<String>();
-    isRelatedToSet.add(value);
-    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
-    isRelatedTo.put(key, isRelatedToSet);
-    entity.setIsRelatedToEntities(isRelatedTo);
-
-    // add the relatesTo info
-    key = "container";
-    value = "relates_to_entity_id_here";
-    Set<String> relatesToSet = new HashSet<String>();
-    relatesToSet.add(value);
-    value = "relates_to_entity_id_here_Second";
-    relatesToSet.add(value);
-    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
-    relatesTo.put(key, relatesToSet);
-    entity.setRelatesToEntities(relatesTo);
-
-    // add some config entries
-    Map<String, String> conf = new HashMap<String, String>();
-    conf.put("config_param1", "value1");
-    conf.put("config_param2", "value2");
-    entity.addConfigs(conf);
-
-    // add metrics
-    Set<TimelineMetric> metrics = new HashSet<>();
-    TimelineMetric m1 = new TimelineMetric();
-    m1.setId("MAP_SLOT_MILLIS");
-    Map<Long, Number> metricValues = new HashMap<Long, Number>();
-    long ts = System.currentTimeMillis();
-    metricValues.put(ts - 120000, 100000000);
-    metricValues.put(ts - 100000, 200000000);
-    metricValues.put(ts - 80000, 300000000);
-    metricValues.put(ts - 60000, 400000000);
-    metricValues.put(ts - 40000, 50000000000L);
-    metricValues.put(ts - 20000, 60000000000L);
-    m1.setType(Type.TIME_SERIES);
-    m1.setValues(metricValues);
-    metrics.add(m1);
-    entity.addMetrics(metrics);
-
-    te.addEntity(entity);
-
-    HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      Configuration c1 = util.getConfiguration();
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      String cluster = "cluster_test_write_entity";
-      String user = "user1";
-      String flow = "some_flow_name";
-      String flowVersion = "AB7822C10F1111";
-      long runid = 1002345678919L;
-      String appName = "some app name";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
-      hbi.stop();
-
-      // scan the table and see that entity exists
-      Scan s = new Scan();
-      byte[] startRow =
-          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
-      s.setStartRow(startRow);
-      s.setMaxVersions(Integer.MAX_VALUE);
-      Connection conn = ConnectionFactory.createConnection(c1);
-      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
-
-      int rowCount = 0;
-      int colCount = 0;
-      for (Result result : scanner) {
-        if (result != null && !result.isEmpty()) {
-          rowCount++;
-          colCount += result.size();
-          byte[] row1 = result.getRow();
-          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
-              entity));
-
-          // check info column family
-          String id1 = EntityColumn.ID.readResult(result).toString();
-          assertEquals(id, id1);
-
-          String type1 = EntityColumn.TYPE.readResult(result).toString();
-          assertEquals(type, type1);
-
-          Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
-          long cTime1 = val.longValue();
-          assertEquals(cTime1, cTime);
-
-          val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
-          long mTime1 = val.longValue();
-          assertEquals(mTime1, mTime);
-
-          Map<String, Object> infoColumns =
-              EntityColumnPrefix.INFO.readResults(result);
-          assertEquals(infoMap, infoColumns);
-
-          // Remember isRelatedTo is of type Map<String, Set<String>>
-          for (String isRelatedToKey : isRelatedTo.keySet()) {
-            Object isRelatedToValue =
-                EntityColumnPrefix.IS_RELATED_TO.readResult(result,
-                    isRelatedToKey);
-            String compoundValue = isRelatedToValue.toString();
-            // id7?id9?id6
-            Set<String> isRelatedToValues =
-                new HashSet<String>(
-                    Separator.VALUES.splitEncoded(compoundValue));
-            assertEquals(isRelatedTo.get(isRelatedToKey).size(),
-                isRelatedToValues.size());
-            for (String v : isRelatedTo.get(isRelatedToKey)) {
-              assertTrue(isRelatedToValues.contains(v));
-            }
-          }
-
-          // RelatesTo
-          for (String relatesToKey : relatesTo.keySet()) {
-            String compoundValue =
-                EntityColumnPrefix.RELATES_TO.readResult(result, relatesToKey)
-                    .toString();
-            // id3?id4?id5
-            Set<String> relatesToValues =
-                new HashSet<String>(
-                    Separator.VALUES.splitEncoded(compoundValue));
-            assertEquals(relatesTo.get(relatesToKey).size(),
-                relatesToValues.size());
-            for (String v : relatesTo.get(relatesToKey)) {
-              assertTrue(relatesToValues.contains(v));
-            }
-          }
-
-          // Configuration
-          Map<String, Object> configColumns =
-              EntityColumnPrefix.CONFIG.readResults(result);
-          assertEquals(conf, configColumns);
-
-          NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-              EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
-
-          NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
-          assertEquals(metricValues, metricMap);
-        }
-      }
-      assertEquals(1, rowCount);
-      assertEquals(17, colCount);
-
-      // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(),
-          EnumSet.of(TimelineReader.Field.ALL));
-      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
-          appName, entity.getType(), null, null, null, null, null, null, null,
-          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
-      assertNotNull(e1);
-      assertEquals(1, es1.size());
-
-      // verify attributes
-      assertEquals(id, e1.getId());
-      assertEquals(type, e1.getType());
-      assertEquals(cTime, e1.getCreatedTime());
-      assertEquals(mTime, e1.getModifiedTime());
-      Map<String, Object> infoMap2 = e1.getInfo();
-      assertEquals(infoMap, infoMap2);
-
-      Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
-      assertEquals(isRelatedTo, isRelatedTo2);
-
-      Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
-      assertEquals(relatesTo, relatesTo2);
-
-      Map<String, String> conf2 = e1.getConfigs();
-      assertEquals(conf, conf2);
-
-      Set<TimelineMetric> metrics2 = e1.getMetrics();
-      assertEquals(metrics, metrics2);
-      for (TimelineMetric metric2 : metrics2) {
-        Map<Long, Number> metricValues2 = metric2.getValues();
-        assertEquals(metricValues, metricValues2);
-      }
-    } finally {
-      if (hbi != null) {
-        hbi.stop();
-        hbi.close();
-      }
-      if (hbr != null) {
-        hbr.stop();
-        hbr.close();
-      }
-    }
-  }
-
-  private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
-      String flow, long runid, String appName, TimelineEntity te) {
-
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    assertTrue(rowKeyComponents.length == 7);
-    assertEquals(user, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
-    assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
-    assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
-    return true;
-  }
-
-  private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
-      String user, String flow, long runid, String appName) {
-
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    assertTrue(rowKeyComponents.length == 5);
-    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
-    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
-    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
-    return true;
-  }
-
-  @Test
-  public void testEvents() throws IOException {
-    TimelineEvent event = new TimelineEvent();
-    String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
-    event.setId(eventId);
-    long expTs = 1436512802000L;
-    event.setTimestamp(expTs);
-    String expKey = "foo_event";
-    Object expVal = "test";
-    event.addInfo(expKey, expVal);
-
-    final TimelineEntity entity = new ApplicationEntity();
-    entity.setId(ApplicationId.newInstance(0, 1).toString());
-    entity.addEvent(event);
-
-    TimelineEntities entities = new TimelineEntities();
-    entities.addEntity(entity);
-
-    HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      Configuration c1 = util.getConfiguration();
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      String cluster = "cluster_test_events";
-      String user = "user2";
-      String flow = "other_flow_name";
-      String flowVersion = "1111F01C2287BA";
-      long runid = 1009876543218L;
-      String appName = "some app name";
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
-      hbi.stop();
-
-      // retrieve the row
-      byte[] rowKey =
-          ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName);
-      Get get = new Get(rowKey);
-      get.setMaxVersions(Integer.MAX_VALUE);
-      Connection conn = ConnectionFactory.createConnection(c1);
-      Result result = new ApplicationTable().getResult(c1, conn, get);
-
-      assertTrue(result != null);
-
-      // check the row key
-      byte[] row1 = result.getRow();
-      assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
-          appName));
-
-      Map<?, Object> eventsResult =
-          ApplicationColumnPrefix.EVENT.
-              readResultsHavingCompoundColumnQualifiers(result);
-      // there should be only one event
-      assertEquals(1, eventsResult.size());
-      for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
-        // the qualifier is a compound key
-        // hence match individual values
-        byte[][] karr = (byte[][])e.getKey();
-        assertEquals(3, karr.length);
-        assertEquals(eventId, Bytes.toString(karr[0]));
-        assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
-        assertEquals(expKey, Bytes.toString(karr[2]));
-        Object value = e.getValue();
-        // there should be only one timestamp and value
-        assertEquals(expVal, value.toString());
-      }
-
-      // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(),
-          EnumSet.of(TimelineReader.Field.ALL));
-      TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
-          entity.getType(), entity.getId(),
-          EnumSet.of(TimelineReader.Field.ALL));
-      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
-          appName, entity.getType(), null, null, null, null, null, null, null,
-          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
-      Set<TimelineEntity> es2 = hbr.getEntities(user, cluster, null, null,
-          appName, entity.getType(), null, null, null, null, null, null, null,
-          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
-      assertNotNull(e1);
-      assertNotNull(e2);
-      assertEquals(e1, e2);
-      assertEquals(1, es1.size());
-      assertEquals(1, es2.size());
-      assertEquals(es1, es2);
-
-      // check the events
-      NavigableSet<TimelineEvent> events = e1.getEvents();
-      // there should be only one event
-      assertEquals(1, events.size());
-      for (TimelineEvent e : events) {
-        assertEquals(eventId, e.getId());
-        assertEquals(expTs, e.getTimestamp());
-        Map<String,Object> info = e.getInfo();
-        assertEquals(1, info.size());
-        for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
-          assertEquals(expKey, infoEntry.getKey());
-          assertEquals(expVal, infoEntry.getValue());
-        }
-      }
-    } finally {
-      if (hbi != null) {
-        hbi.stop();
-        hbi.close();
-      }
-      if (hbr != null) {
-        hbr.stop();
-        hbr.close();
-      }
-    }
-  }
-
-  @Test
-  public void testEventsWithEmptyInfo() throws IOException {
-    TimelineEvent event = new TimelineEvent();
-    String eventId = "foo_event_id";
-    event.setId(eventId);
-    long expTs = 1436512802000L;
-    event.setTimestamp(expTs);
-
-    final TimelineEntity entity = new TimelineEntity();
-    entity.setId("attempt_1329348432655_0001_m_000008_18");
-    entity.setType("FOO_ATTEMPT");
-    entity.addEvent(event);
-
-    TimelineEntities entities = new TimelineEntities();
-    entities.addEntity(entity);
-
-    HBaseTimelineWriterImpl hbi = null;
-    HBaseTimelineReaderImpl hbr = null;
-    try {
-      Configuration c1 = util.getConfiguration();
-      hbi = new HBaseTimelineWriterImpl(c1);
-      hbi.init(c1);
-      hbi.start();
-      hbr = new HBaseTimelineReaderImpl();
-      hbr.init(c1);
-      hbr.start();
-      String cluster = "cluster_test_empty_eventkey";
-      String user = "user_emptyeventkey";
-      String flow = "other_flow_name";
-      String flowVersion = "1111F01C2287BA";
-      long runid = 1009876543218L;
-      String appName = "some app name";
-      byte[] startRow =
-          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
-      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
-      hbi.stop();
-      // scan the table and see that entity exists
-      Scan s = new Scan();
-      s.setStartRow(startRow);
-      s.addFamily(EntityColumnFamily.INFO.getBytes());
-      Connection conn = ConnectionFactory.createConnection(c1);
-      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
-
-      int rowCount = 0;
-      for (Result result : scanner) {
-        if (result != null && !result.isEmpty()) {
-          rowCount++;
-
-          // check the row key
-          byte[] row1 = result.getRow();
-          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
-              entity));
-
-          Map<?, Object> eventsResult =
-              EntityColumnPrefix.EVENT.
-                  readResultsHavingCompoundColumnQualifiers(result);
-          // there should be only one event
-          assertEquals(1, eventsResult.size());
-          for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
-            // the qualifier is a compound key
-            // hence match individual values
-            byte[][] karr = (byte[][])e.getKey();
-            assertEquals(3, karr.length);
-            assertEquals(eventId, Bytes.toString(karr[0]));
-            assertEquals(TimelineWriterUtils.invert(expTs),
-                Bytes.toLong(karr[1]));
-            // key must be empty
-            assertEquals(0, karr[2].length);
-            Object value = e.getValue();
-            // value should be empty
-            assertEquals("", value.toString());
-          }
-        }
-      }
-      assertEquals(1, rowCount);
-
-      // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(),
-          EnumSet.of(TimelineReader.Field.ALL));
-      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
-          appName, entity.getType(), null, null, null, null, null, null, null,
-          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
-      assertNotNull(e1);
-      assertEquals(1, es1.size());
-
-      // check the events
-      NavigableSet<TimelineEvent> events = e1.getEvents();
-      // there should be only one event
-      assertEquals(1, events.size());
-      for (TimelineEvent e : events) {
-        assertEquals(eventId, e.getId());
-        assertEquals(expTs, e.getTimestamp());
-        Map<String,Object> info = e.getInfo();
-        assertTrue(info == null || info.isEmpty());
-      }
-    } finally {
-      hbi.stop();
-      hbi.close();
-      hbr.stop();;
-      hbr.close();
-    }
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    util.shutdownMiniCluster();
-  }
-}


Mime
View raw message