Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 23F8618DD6 for ; Tue, 11 Aug 2015 23:59:52 +0000 (UTC) Received: (qmail 50440 invoked by uid 500); 11 Aug 2015 23:59:51 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 50376 invoked by uid 500); 11 Aug 2015 23:59:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 50367 invoked by uid 99); 11 Aug 2015 23:59:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2015 23:59:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B0825E024E; Tue, 11 Aug 2015 23:59:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: junping_du@apache.org To: common-commits@hadoop.apache.org Date: Tue, 11 Aug 2015 23:59:51 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hadoop git commit: YARN-3906. Split the application table from the entity table. Contributed by Sangjin Lee. Repository: hadoop Updated Branches: refs/heads/YARN-2928 07433c2ad -> bcd755eba http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcd755eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index ab02779..95f88d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -47,6 +48,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; @@ -60,7 +65,15 @@ import org.junit.BeforeClass; import org.junit.Test; /** - * @throws Exception + * Various tests to test writing entities to HBase and reading them back from + * it. + * + * It uses a single HBase mini-cluster for all tests which is a little more + * realistic, and helps test correctness in the presence of other data. + * + * Each test uses a different cluster name to be able to handle its own data + * even if other records exist in the table. Use a different cluster name if + * you add a new test. */ public class TestHBaseTimelineWriterImpl { @@ -78,6 +91,199 @@ public class TestHBaseTimelineWriterImpl { .createTable(util.getHBaseAdmin(), util.getConfiguration()); new AppToFlowTable() .createTable(util.getHBaseAdmin(), util.getConfiguration()); + new ApplicationTable() + .createTable(util.getHBaseAdmin(), util.getConfiguration()); + } + + @Test + public void testWriteApplicationToHBase() throws Exception { + TimelineEntities te = new TimelineEntities(); + ApplicationEntity entity = new ApplicationEntity(); + String id = "hello"; + entity.setId(id); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + te.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + HBaseTimelineReaderImpl hbr = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.start(); + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + String cluster = "cluster_test_write_app"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + hbi.write(cluster, user, flow, flowVersion, runid, id, te); + hbi.stop(); + + // retrieve the row + byte[] rowKey = + ApplicationRowKey.getRowKey(cluster, user, flow, runid, id); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + Connection conn = ConnectionFactory.createConnection(c1); + Result result = new ApplicationTable().getResult(c1, conn, get); + + assertTrue(result != null); + assertEquals(16, result.size()); + + // check the row key + byte[] row1 = result.getRow(); + assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, + id)); + + // check info column family + String id1 = ApplicationColumn.ID.readResult(result).toString(); + assertEquals(id, id1); + + Number val = + (Number) ApplicationColumn.CREATED_TIME.readResult(result); + Long cTime1 = val.longValue(); + assertEquals(cTime1, cTime); + + val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result); + Long mTime1 = val.longValue(); + assertEquals(mTime1, mTime); + + Map infoColumns = + ApplicationColumnPrefix.INFO.readResults(result); + assertEquals(infoMap.size(), infoColumns.size()); + for (String infoItem : infoMap.keySet()) { + assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem)); + } + + // Remember isRelatedTo is of type Map> + for (String isRelatedToKey : isRelatedTo.keySet()) { + Object isRelatedToValue = + ApplicationColumnPrefix.IS_RELATED_TO.readResult(result, + isRelatedToKey); + String compoundValue = isRelatedToValue.toString(); + // id7?id9?id6 + Set isRelatedToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(isRelatedTo.get(isRelatedToKey).size(), + isRelatedToValues.size()); + for (String v : isRelatedTo.get(isRelatedToKey)) { + assertTrue(isRelatedToValues.contains(v)); + } + } + + // RelatesTo + for (String relatesToKey : relatesTo.keySet()) { + String compoundValue = + ApplicationColumnPrefix.RELATES_TO.readResult(result, + relatesToKey).toString(); + // id3?id4?id5 + Set relatesToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(relatesTo.get(relatesToKey).size(), + relatesToValues.size()); + for (String v : relatesTo.get(relatesToKey)) { + assertTrue(relatesToValues.contains(v)); + } + } + + // Configuration + Map configColumns = + ApplicationColumnPrefix.CONFIG.readResults(result); + assertEquals(conf.size(), configColumns.size()); + for (String configItem : conf.keySet()) { + assertEquals(conf.get(configItem), configColumns.get(configItem)); + } + + NavigableMap> metricsResult = + ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); + + NavigableMap metricMap = metricsResult.get(m1.getId()); + // We got metrics back + assertNotNull(metricMap); + // Same number of metrics as we wrote + assertEquals(metricValues.entrySet().size(), metricMap.entrySet().size()); + + // Iterate over original metrics and confirm that they are present + // here. + for (Entry metricEntry : metricValues.entrySet()) { + assertEquals(metricEntry.getValue(), + metricMap.get(metricEntry.getKey())); + } + + TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id, + entity.getType(), entity.getId(), + EnumSet.of(TimelineReader.Field.ALL)); + Set es1 = hbr.getEntities(user, cluster, flow, runid, + id, entity.getType(), null, null, null, null, null, null, null, + null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + assertNotNull(e1); + assertEquals(1, es1.size()); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + if (hbr != null) { + hbr.stop(); + hbr.close(); + } + } } @Test @@ -154,7 +360,7 @@ public class TestHBaseTimelineWriterImpl { hbr = new HBaseTimelineReaderImpl(); hbr.init(c1); hbr.start(); - String cluster = "cluster1"; + String cluster = "cluster_test_write_entity"; String user = "user1"; String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; @@ -268,7 +474,8 @@ public class TestHBaseTimelineWriterImpl { assertEquals(17, colCount); TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); + entity.getType(), entity.getId(), + EnumSet.of(TimelineReader.Field.ALL)); Set es1 = hbr.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); @@ -284,10 +491,6 @@ public class TestHBaseTimelineWriterImpl { hbr.close(); } } - - // Somewhat of a hack, not a separate test in order not to have to deal with - // test case order exectution. - testAdditionalEntity(); } private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, @@ -299,14 +502,31 @@ public class TestHBaseTimelineWriterImpl { assertEquals(user, Bytes.toString(rowKeyComponents[0])); assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(TimelineWriterUtils.invert(runid), Bytes.toLong(rowKeyComponents[3])); + assertEquals(TimelineWriterUtils.invert(runid), + Bytes.toLong(rowKeyComponents[3])); assertEquals(appName, Bytes.toString(rowKeyComponents[4])); assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); return true; } - private void testAdditionalEntity() throws IOException { + private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, + String user, String flow, Long runid, String appName) { + + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); + + assertTrue(rowKeyComponents.length == 5); + assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); + assertEquals(user, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.invert(runid), + Bytes.toLong(rowKeyComponents[3])); + assertEquals(appName, Bytes.toString(rowKeyComponents[4])); + return true; + } + + @Test + public void testEvents() throws IOException { TimelineEvent event = new TimelineEvent(); String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE; event.setId(eventId); @@ -333,7 +553,7 @@ public class TestHBaseTimelineWriterImpl { hbr = new HBaseTimelineReaderImpl(); hbr.init(c1); hbr.start(); - String cluster = "cluster2"; + String cluster = "cluster_test_events"; String user = "user2"; String flow = "other_flow_name"; String flowVersion = "1111F01C2287BA"; @@ -341,50 +561,46 @@ public class TestHBaseTimelineWriterImpl { String appName = "some app name"; hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.stop(); - // scan the table and see that entity exists - Scan s = new Scan(); - byte[] startRow = - EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); - s.setStartRow(startRow); - Connection conn = ConnectionFactory.createConnection(c1); - ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s); - - int rowCount = 0; - for (Result result : scanner) { - if (result != null && !result.isEmpty()) { - rowCount++; - - // check the row key - byte[] row1 = result.getRow(); - assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, - entity)); - Map eventsResult = - EntityColumnPrefix.EVENT.readResults(result); - // there should be only one event - assertEquals(1, eventsResult.size()); - // key name for the event - byte[] compoundColumnQualifierBytes = - Separator.VALUES.join(Bytes.toBytes(eventId), - Bytes.toBytes(TimelineWriterUtils.invert(expTs)), - Bytes.toBytes(expKey)); - String valueKey = Bytes.toString(compoundColumnQualifierBytes); - for (Map.Entry e : - eventsResult.entrySet()) { - // the value key must match - assertEquals(valueKey, e.getKey()); - Object value = e.getValue(); - // there should be only one timestamp and value - assertEquals(expVal, value.toString()); - } - } + // retrieve the row + byte[] rowKey = + ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + Connection conn = ConnectionFactory.createConnection(c1); + Result result = new ApplicationTable().getResult(c1, conn, get); + + assertTrue(result != null); + + // check the row key + byte[] row1 = result.getRow(); + assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, + appName)); + + Map eventsResult = + ApplicationColumnPrefix.EVENT.readResults(result); + // there should be only one event + assertEquals(1, eventsResult.size()); + // key name for the event + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(Bytes.toBytes(eventId), + Bytes.toBytes(TimelineWriterUtils.invert(expTs)), + Bytes.toBytes(expKey)); + String valueKey = Bytes.toString(compoundColumnQualifierBytes); + for (Map.Entry e : eventsResult.entrySet()) { + // the value key must match + assertEquals(valueKey, e.getKey()); + Object value = e.getValue(); + // there should be only one timestamp and value + assertEquals(expVal, value.toString()); } - assertEquals(1, rowCount); TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); + entity.getType(), entity.getId(), + EnumSet.of(TimelineReader.Field.ALL)); TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName, - entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); + entity.getType(), entity.getId(), + EnumSet.of(TimelineReader.Field.ALL)); Set es1 = hbr.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); @@ -410,7 +626,7 @@ public class TestHBaseTimelineWriterImpl { } @Test - public void testAdditionalEntityEmptyEventInfo() throws IOException { + public void testEventsWithEmptyInfo() throws IOException { TimelineEvent event = new TimelineEvent(); String eventId = "foo_event_id"; event.setId(eventId); @@ -435,7 +651,7 @@ public class TestHBaseTimelineWriterImpl { hbr = new HBaseTimelineReaderImpl(); hbr.init(c1); hbr.start(); - String cluster = "cluster_emptyeventkey"; + String cluster = "cluster_test_empty_eventkey"; String user = "user_emptyeventkey"; String flow = "other_flow_name"; String flowVersion = "1111F01C2287BA"; @@ -487,7 +703,8 @@ public class TestHBaseTimelineWriterImpl { assertEquals(1, rowCount); TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); + entity.getType(), entity.getId(), + EnumSet.of(TimelineReader.Field.ALL)); Set es1 = hbr.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));