Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9BC3189BB for ; Tue, 10 Nov 2015 00:36:34 +0000 (UTC) Received: (qmail 65045 invoked by uid 500); 10 Nov 2015 00:36:18 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 64544 invoked by uid 500); 10 Nov 2015 00:36:17 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 61809 invoked by uid 99); 10 Nov 2015 00:36:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2015 00:36:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6468BE0AA0; Tue, 10 Nov 2015 00:36:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Tue, 10 Nov 2015 00:36:59 -0000 Message-Id: <16ccc72277da42bc909249e82a8b28c4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [45/50] [abbrv] hadoop git commit: YARN-4178. [storage implementation] app id as string in row keys can cause incorrect ordering (Varun Saxena via sjlee) YARN-4178. [storage implementation] app id as string in row keys can cause incorrect ordering (Varun Saxena via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f1d0e934 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f1d0e934 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f1d0e934 Branch: refs/heads/YARN-2928-rebase Commit: f1d0e9344899a957116c434c55e4577ecdb2b7bc Parents: a0340f4 Author: Sangjin Lee Authored: Tue Oct 6 16:06:28 2015 -0700 Committer: Sangjin Lee Committed: Mon Nov 9 16:13:15 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../storage/ApplicationEntityReader.java | 14 +- .../storage/FileSystemTimelineReaderImpl.java | 14 +- .../storage/GenericEntityReader.java | 17 +- .../storage/HBaseTimelineWriterImpl.java | 20 +- .../storage/application/ApplicationRowKey.java | 13 +- .../storage/apptoflow/AppToFlowRowKey.java | 7 +- .../storage/common/Separator.java | 4 +- .../storage/common/TimelineReaderUtils.java | 112 ----- .../storage/common/TimelineStorageUtils.java | 475 +++++++++++++++++++ .../storage/common/TimelineWriterUtils.java | 328 ------------- .../storage/entity/EntityRowKey.java | 32 +- .../storage/flow/FlowActivityColumnPrefix.java | 6 +- .../storage/flow/FlowActivityRowKey.java | 9 +- .../storage/flow/FlowRunColumn.java | 4 +- .../storage/flow/FlowRunColumnPrefix.java | 6 +- .../storage/flow/FlowRunCoprocessor.java | 4 +- .../storage/flow/FlowRunRowKey.java | 6 +- .../storage/flow/FlowScanner.java | 6 +- .../storage/TestHBaseTimelineStorage.java | 13 +- .../common/TestTimelineStorageUtils.java | 56 +++ .../storage/common/TestTimelineWriterUtils.java | 29 -- .../flow/TestHBaseStorageFlowActivity.java | 10 +- 23 files changed, 629 insertions(+), 559 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d8052da..d9e0968 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -168,6 +168,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4058. Miscellaneous issues in NodeManager project (Naganarasimha G R via sjlee) + YARN-4178. [storage implementation] app id as string in row keys can cause + incorrect ordering (Varun Saxena via sjlee) + Trunk - Unreleased INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java index 61954e1..6d1a2ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import com.google.common.base.Preconditions; @@ -182,7 +182,7 @@ class ApplicationEntityReader extends GenericEntityReader { fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( entity.getIsRelatedToEntities(), isRelatedTo)) { return null; } @@ -198,7 +198,7 @@ class ApplicationEntityReader extends GenericEntityReader { fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( entity.getRelatesToEntities(), relatesTo)) { return null; } @@ -214,7 +214,7 @@ class ApplicationEntityReader extends GenericEntityReader { fieldsToRetrieve.contains(Field.INFO) || checkInfo) { readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); if (checkInfo && - !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -228,7 +228,7 @@ class ApplicationEntityReader extends GenericEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineReaderUtils.matchFilters( + if (checkConfigs && !TimelineStorageUtils.matchFilters( entity.getConfigs(), configFilters)) { return null; } @@ -243,7 +243,7 @@ class ApplicationEntityReader extends GenericEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { readEvents(entity, result, true); - if (checkEvents && !TimelineReaderUtils.matchEventFilters( + if (checkEvents && !TimelineStorageUtils.matchEventFilters( entity.getEvents(), eventFilters)) { return null; } @@ -258,7 +258,7 @@ class ApplicationEntityReader extends GenericEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { readMetrics(entity, result, ApplicationColumnPrefix.METRIC); - if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( entity.getMetrics(), metricFilters)) { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 626c770..30d1d00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -321,31 +321,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService continue; } if (relatesTo != null && !relatesTo.isEmpty() && - !TimelineReaderUtils + !TimelineStorageUtils .matchRelations(entity.getRelatesToEntities(), relatesTo)) { continue; } if (isRelatedTo != null && !isRelatedTo.isEmpty() && - !TimelineReaderUtils + !TimelineStorageUtils .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { continue; } if (infoFilters != null && !infoFilters.isEmpty() && - !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { continue; } if (configFilters != null && !configFilters.isEmpty() && - !TimelineReaderUtils.matchFilters( + !TimelineStorageUtils.matchFilters( entity.getConfigs(), configFilters)) { continue; } if (metricFilters != null && !metricFilters.isEmpty() && - !TimelineReaderUtils.matchMetricFilters( + !TimelineStorageUtils.matchMetricFilters( entity.getMetrics(), metricFilters)) { continue; } if (eventFilters != null && !eventFilters.isEmpty() && - !TimelineReaderUtils.matchEventFilters( + !TimelineStorageUtils.matchEventFilters( entity.getEvents(), eventFilters)) { continue; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java index 42079d7..c18966f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -44,8 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; @@ -220,7 +219,7 @@ class GenericEntityReader extends TimelineEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( entity.getIsRelatedToEntities(), isRelatedTo)) { return null; } @@ -235,7 +234,7 @@ class GenericEntityReader extends TimelineEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( entity.getRelatesToEntities(), relatesTo)) { return null; } @@ -251,7 +250,7 @@ class GenericEntityReader extends TimelineEntityReader { fieldsToRetrieve.contains(Field.INFO) || checkInfo) { readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); if (checkInfo && - !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { return null; } if (!fieldsToRetrieve.contains(Field.ALL) && @@ -265,7 +264,7 @@ class GenericEntityReader extends TimelineEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineReaderUtils.matchFilters( + if (checkConfigs && !TimelineStorageUtils.matchFilters( entity.getConfigs(), configFilters)) { return null; } @@ -280,7 +279,7 @@ class GenericEntityReader extends TimelineEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { readEvents(entity, result, false); - if (checkEvents && !TimelineReaderUtils.matchEventFilters( + if (checkEvents && !TimelineStorageUtils.matchEventFilters( entity.getEvents(), eventFilters)) { return null; } @@ -295,7 +294,7 @@ class GenericEntityReader extends TimelineEntityReader { if (fieldsToRetrieve.contains(Field.ALL) || fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { readMetrics(entity, result, EntityColumnPrefix.METRIC); - if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( entity.getMetrics(), metricFilters)) { return null; } @@ -365,7 +364,7 @@ class GenericEntityReader extends TimelineEntityReader { // the column name is of the form "eventId=timestamp=infoKey" if (karr.length == 3) { String id = Bytes.toString(karr[0]); - long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1])); + long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); TimelineEvent event = eventsMap.get(key); if (event == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 7c4a5da..3649865 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -125,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // if the entity is the application, the destination is the application // table - boolean isApplication = TimelineWriterUtils.isApplicationEntity(te); + boolean isApplication = TimelineStorageUtils.isApplicationEntity(te); byte[] rowKey = isApplication ? ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId) : @@ -139,7 +139,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements storeRelations(rowKey, te, isApplication); if (isApplication) { - if (TimelineWriterUtils.isApplicationCreated(te)) { + if (TimelineStorageUtils.isApplicationCreated(te)) { onApplicationCreated(clusterId, userId, flowName, flowVersion, flowRunId, appId, te); } @@ -149,7 +149,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // if application has finished, store it's finish time and write final // values // of all metrics - if (TimelineWriterUtils.isApplicationFinished(te)) { + if (TimelineStorageUtils.isApplicationFinished(te)) { onApplicationFinished(clusterId, userId, flowName, flowVersion, flowRunId, appId, te); } @@ -234,7 +234,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID .getAttribute(appId); FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, - TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId); + TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId); // store the final value of metrics since application has finished Set metrics = te.getMetrics(); @@ -406,9 +406,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements } byte[] columnQualifierFirst = Bytes.toBytes(Separator.VALUES.encode(eventId)); - byte[] columnQualifierWithTsBytes = - Separator.VALUES.join(columnQualifierFirst, - Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp))); + byte[] columnQualifierWithTsBytes = Separator.VALUES. + join(columnQualifierFirst, Bytes.toBytes( + TimelineStorageUtils.invertLong(eventTimestamp))); Map eventInfo = event.getInfo(); if ((eventInfo == null) || (eventInfo.size() == 0)) { // add separator since event key is empty @@ -418,11 +418,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (isApplication) { ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, compoundColumnQualifierBytes, null, - TimelineWriterUtils.EMPTY_BYTES); + TimelineStorageUtils.EMPTY_BYTES); } else { EntityColumnPrefix.EVENT.store(rowKey, entityTable, compoundColumnQualifierBytes, null, - TimelineWriterUtils.EMPTY_BYTES); + TimelineStorageUtils.EMPTY_BYTES); } } else { for (Map.Entry info : eventInfo.entrySet()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java index 10e3c2e..1cf6145 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Represents a rowkey for the application table. @@ -90,7 +90,7 @@ public class ApplicationRowKey { String flowId, Long flowRunId) { byte[] first = Bytes.toBytes( Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId)); - byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); return Separator.QUALIFIERS.join(first, second, new byte[0]); } @@ -112,8 +112,8 @@ public class ApplicationRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); - byte[] third = Bytes.toBytes(appId); + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); + byte[] third = TimelineStorageUtils.encodeAppId(appId); return Separator.QUALIFIERS.join(first, second, third); } @@ -135,9 +135,8 @@ public class ApplicationRowKey { String flowId = Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); long flowRunId = - TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); - String appId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4])); + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); + String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]); return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index ca88056..133952e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Represents a rowkey for the app_flow table. @@ -49,7 +50,9 @@ public class AppToFlowRowKey { * @return byte array with the row key */ public static byte[] getRowKey(String clusterId, String appId) { - return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId)); + byte[] first = Bytes.toBytes(clusterId); + byte[] second = TimelineStorageUtils.encodeAppId(appId); + return Separator.QUALIFIERS.join(first, second); } /** @@ -64,7 +67,7 @@ public class AppToFlowRowKey { } String clusterId = Bytes.toString(rowKeyComponents[0]); - String appId = Bytes.toString(rowKeyComponents[1]); + String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]); return new AppToFlowRowKey(clusterId, appId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index 9f91af8..1e82494 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -304,7 +304,7 @@ public enum Separator { * @return source split by this separator. */ public byte[][] split(byte[] source, int limit) { - return TimelineWriterUtils.split(source, this.bytes, limit); + return TimelineStorageUtils.split(source, this.bytes, limit); } /** @@ -315,6 +315,6 @@ public enum Separator { * @return source split by this separator. */ public byte[][] split(byte[] source) { - return TimelineWriterUtils.split(source, this.bytes); + return TimelineStorageUtils.split(source, this.bytes); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java deleted file mode 100644 index 91d7ba4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - - -public class TimelineReaderUtils { - /** - * - * @param entityRelations the relations of an entity - * @param relationFilters the relations for filtering - * @return a boolean flag to indicate if both match - */ - public static boolean matchRelations( - Map> entityRelations, - Map> relationFilters) { - for (Map.Entry> relation : relationFilters.entrySet()) { - Set ids = entityRelations.get(relation.getKey()); - if (ids == null) { - return false; - } - for (String id : relation.getValue()) { - if (!ids.contains(id)) { - return false; - } - } - } - return true; - } - - /** - * - * @param map the map of key/value pairs in an entity - * @param filters the map of key/value pairs for filtering - * @return a boolean flag to indicate if both match - */ - public static boolean matchFilters(Map map, - Map filters) { - for (Map.Entry filter : filters.entrySet()) { - Object value = map.get(filter.getKey()); - if (value == null) { - return false; - } - if (!value.equals(filter.getValue())) { - return false; - } - } - return true; - } - - /** - * - * @param entityEvents the set of event objects in an entity - * @param eventFilters the set of event Ids for filtering - * @return a boolean flag to indicate if both match - */ - public static boolean matchEventFilters(Set entityEvents, - Set eventFilters) { - Set eventIds = new HashSet(); - for (TimelineEvent event : entityEvents) { - eventIds.add(event.getId()); - } - for (String eventFilter : eventFilters) { - if (!eventIds.contains(eventFilter)) { - return false; - } - } - return true; - } - - /** - * - * @param metrics the set of metric objects in an entity - * @param metricFilters the set of metric Ids for filtering - * @return a boolean flag to indicate if both match - */ - public static boolean matchMetricFilters(Set metrics, - Set metricFilters) { - Set metricIds = new HashSet(); - for (TimelineMetric metric : metrics) { - metricIds.add(metric.getId()); - } - - for (String metricFilter : metricFilters) { - if (!metricIds.contains(metricFilter)) { - return false; - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java new file mode 100644 index 0000000..c1aaf19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -0,0 +1,475 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.Map.Entry; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * A bunch of utility functions used across TimelineReader and TimelineWriter. + */ +@Public +@Unstable +public class TimelineStorageUtils { + + /** empty bytes */ + public static final byte[] EMPTY_BYTES = new byte[0]; + + /** indicator for no limits for splitting */ + public static final int NO_LIMIT_SPLIT = -1; + + /** milliseconds in one day */ + public static final long MILLIS_ONE_DAY = 86400000L; + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return byte[] array after splitting the source + */ + public static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, NO_LIMIT_SPLIT); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @param limit a non-positive value indicates no limit on number of segments. + * @return byte[][] after splitting the input source + */ + public static byte[][] split(byte[] source, byte[] separator, int limit) { + List segments = splitRanges(source, separator, limit); + + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + */ + public static List splitRanges(byte[] source, byte[] separator) { + return splitRanges(source, separator, NO_LIMIT_SPLIT); + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * + * @param source the source data + * @param separator the separator pattern to look for + * @param limit the maximum number of splits to identify in the source + */ + public static List splitRanges(byte[] source, byte[] separator, + int limit) { + List segments = new ArrayList(); + if ((source == null) || (separator == null)) { + return segments; + } + int start = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > 0 && segments.size() >= (limit - 1)) { + // everything else goes in one final segment + break; + } + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length - 1; + } + // add in remaining to a final range + if (start <= source.length) { + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * Converts a timestamp into it's inverse timestamp to be used in (row) keys + * where we want to have the most recent timestamp in the top of the table + * (scans start at the most recent timestamp first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted long + */ + public static long invertLong(long key) { + return Long.MAX_VALUE - key; + } + + /** + * Converts an int into it's inverse int to be used in (row) keys + * where we want to have the largest int value in the top of the table + * (scans start at the largest int first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted int + */ + public static int invertInt(int key) { + return Integer.MAX_VALUE - key; + } + + + /** + * Converts/encodes a string app Id into a byte representation for (row) keys. + * For conversion, we extract cluster timestamp and sequence id from the + * string app id (calls {@link ConverterUtils#toApplicationId(String)} for + * conversion) and then store it in a byte array of length 12 (8 bytes (long) + * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster + * timestamp and sequence id are inverted so that the most recent cluster + * timestamp and highest sequence id appears first in the table (i.e. + * application id appears in a descending order). + * + * @param appIdStr application id in string format i.e. + * application_{cluster timestamp}_{sequence id with min 4 digits} + * + * @return encoded byte representation of app id. + */ + public static byte[] encodeAppId(String appIdStr) { + ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); + byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT]; + byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp())); + System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG); + byte[] seqId = Bytes.toBytes(invertInt(appId.getId())); + System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT); + return appIdBytes; + } + + /** + * Converts/decodes a 12 byte representation of app id for (row) keys to an + * app id in string format which can be returned back to client. + * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster + * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls + * {@link ApplicationId#toString} to generate string representation of app id. + * + * @param appIdBytes application id in byte representation. + * + * @return decoded app id in string format. + */ + public static String decodeAppId(byte[] appIdBytes) { + if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) { + throw new IllegalArgumentException("Invalid app id in byte format"); + } + long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG)); + int seqId = + invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT)); + return ApplicationId.newInstance(clusterTs, seqId).toString(); + } + + /** + * returns the timestamp of that day's start (which is midnight 00:00:00 AM) + * for a given input timestamp + * + * @param ts + * @return timestamp of that day's beginning (midnight) + */ + public static long getTopOfTheDayTimestamp(long ts) { + long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); + return dayTimestamp; + } + + /** + * Combines the input array of attributes and the input aggregation operation + * into a new array of attributes. + * + * @param attributes + * @param aggOp + * @return array of combined attributes + */ + public static Attribute[] combineAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int newLength = getNewLengthCombinedAttributes(attributes, aggOp); + Attribute[] combinedAttributes = new Attribute[newLength]; + + if (attributes != null) { + System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); + } + + if (aggOp != null) { + Attribute a2 = aggOp.getAttribute(); + combinedAttributes[newLength - 1] = a2; + } + return combinedAttributes; + } + + /** + * Returns a number for the new array size. The new array is the combination + * of input array of attributes and the input aggregation operation. + * + * @param attributes + * @param aggOp + * @return the size for the new array + */ + private static int getNewLengthCombinedAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int oldLength = getAttributesLength(attributes); + int aggLength = getAppOpLength(aggOp); + return oldLength + aggLength; + } + + private static int getAppOpLength(AggregationOperation aggOp) { + if (aggOp != null) { + return 1; + } + return 0; + } + + private static int getAttributesLength(Attribute[] attributes) { + if (attributes != null) { + return attributes.length; + } + return 0; + } + + /** + * checks if an application has finished + * + * @param te + * @return true if application has finished else false + */ + public static boolean isApplicationFinished(TimelineEntity te) { + SortedSet allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return true; + } + } + return false; + } + + /** + * get the time at which an app finished + * + * @param te + * @return true if application has finished else false + */ + public static long getApplicationFinishedTime(TimelineEntity te) { + SortedSet allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return event.getTimestamp(); + } + } + return 0l; + } + + /** + * Checks if the input TimelineEntity object is an ApplicationEntity. + * + * @param te + * @return true if input is an ApplicationEntity, false otherwise + */ + public static boolean isApplicationEntity(TimelineEntity te) { + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + } + + /** + * Checks for the APPLICATION_CREATED event. + * + * @param te + * @return true is application event exists, false otherwise + */ + public static boolean isApplicationCreated(TimelineEntity te) { + if (isApplicationEntity(te)) { + for (TimelineEvent event : te.getEvents()) { + if (event.getId() + .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + return true; + } + } + } + return false; + } + + /** + * Returns the first seen aggregation operation as seen in the list of input + * tags or null otherwise + * + * @param tags + * @return AggregationOperation + */ + public static AggregationOperation getAggregationOperationFromTagsList( + List tags) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + return aggOp; + } + } + } + return null; + } + + /** + * Creates a {@link Tag} from the input attribute. + * + * @param attribute + * @return Tag + */ + public static Tag getTagFromAttribute(Entry attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + AggregationOperation aggOp = AggregationOperation + .getAggregationOperation(attribute.getKey()); + if (aggOp != null) { + Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + return t; + } + + AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension + .getAggregationCompactionDimension(attribute.getKey()); + if (aggCompactDim != null) { + Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } + return null; + } + + /** + * + * @param entityRelations the relations of an entity + * @param relationFilters the relations for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchRelations( + Map> entityRelations, + Map> relationFilters) { + for (Map.Entry> relation : relationFilters.entrySet()) { + Set ids = entityRelations.get(relation.getKey()); + if (ids == null) { + return false; + } + for (String id : relation.getValue()) { + if (!ids.contains(id)) { + return false; + } + } + } + return true; + } + + /** + * + * @param map the map of key/value pairs in an entity + * @param filters the map of key/value pairs for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchFilters(Map map, + Map filters) { + for (Map.Entry filter : filters.entrySet()) { + Object value = map.get(filter.getKey()); + if (value == null) { + return false; + } + if (!value.equals(filter.getValue())) { + return false; + } + } + return true; + } + + /** + * + * @param entityEvents the set of event objects in an entity + * @param eventFilters the set of event Ids for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchEventFilters(Set entityEvents, + Set eventFilters) { + Set eventIds = new HashSet(); + for (TimelineEvent event : entityEvents) { + eventIds.add(event.getId()); + } + for (String eventFilter : eventFilters) { + if (!eventIds.contains(eventFilter)) { + return false; + } + } + return true; + } + + /** + * + * @param metrics the set of metric objects in an entity + * @param metricFilters the set of metric Ids for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchMetricFilters(Set metrics, + Set metricFilters) { + Set metricIds = new HashSet(); + for (TimelineMetric metric : metrics) { + metricIds.add(metric.getId()); + } + + for (String metricFilter : metricFilters) { + if (!metricIds.contains(metricFilter)) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java deleted file mode 100644 index 371371a..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import java.util.ArrayList; -import java.util.List; -import java.util.SortedSet; -import java.util.Map.Entry; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; - -/** - * bunch of utility functions used across TimelineWriter classes - */ -@InterfaceAudience.Public -@InterfaceStability.Unstable -public class TimelineWriterUtils { - - /** empty bytes */ - public static final byte[] EMPTY_BYTES = new byte[0]; - - /** indicator for no limits for splitting */ - public static final int NO_LIMIT_SPLIT = -1; - - /** milliseconds in one day */ - public static final long MILLIS_ONE_DAY = 86400000L; - - /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see - * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. - * - * @param source - * @param separator - * @return byte[] array after splitting the source - */ - public static byte[][] split(byte[] source, byte[] separator) { - return split(source, separator, NO_LIMIT_SPLIT); - } - - /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see - * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. - * - * @param source - * @param separator - * @param limit a non-positive value indicates no limit on number of segments. - * @return byte[][] after splitting the input source - */ - public static byte[][] split(byte[] source, byte[] separator, int limit) { - List segments = splitRanges(source, separator, limit); - - byte[][] splits = new byte[segments.size()][]; - for (int i = 0; i < segments.size(); i++) { - Range r = segments.get(i); - byte[] tmp = new byte[r.length()]; - if (tmp.length > 0) { - System.arraycopy(source, r.start(), tmp, 0, r.length()); - } - splits[i] = tmp; - } - return splits; - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - */ - public static List splitRanges(byte[] source, byte[] separator) { - return splitRanges(source, separator, NO_LIMIT_SPLIT); - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - * - * @param source the source data - * @param separator the separator pattern to look for - * @param limit the maximum number of splits to identify in the source - */ - public static List splitRanges(byte[] source, byte[] separator, - int limit) { - List segments = new ArrayList(); - if ((source == null) || (separator == null)) { - return segments; - } - int start = 0; - itersource: for (int i = 0; i < source.length; i++) { - for (int j = 0; j < separator.length; j++) { - if (source[i + j] != separator[j]) { - continue itersource; - } - } - // all separator elements matched - if (limit > 0 && segments.size() >= (limit - 1)) { - // everything else goes in one final segment - break; - } - - segments.add(new Range(start, i)); - start = i + separator.length; - // i will be incremented again in outer for loop - i += separator.length - 1; - } - // add in remaining to a final range - if (start <= source.length) { - segments.add(new Range(start, source.length)); - } - return segments; - } - - /** - * Converts a timestamp into it's inverse timestamp to be used in (row) keys - * where we want to have the most recent timestamp in the top of the table - * (scans start at the most recent timestamp first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted long - */ - public static long invert(Long key) { - return Long.MAX_VALUE - key; - } - - /** - * returns the timestamp of that day's start (which is midnight 00:00:00 AM) - * for a given input timestamp - * - * @param ts - * @return timestamp of that day's beginning (midnight) - */ - public static long getTopOfTheDayTimestamp(long ts) { - long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); - return dayTimestamp; - } - - /** - * Combines the input array of attributes and the input aggregation operation - * into a new array of attributes. - * - * @param attributes - * @param aggOp - * @return array of combined attributes - */ - public static Attribute[] combineAttributes(Attribute[] attributes, - AggregationOperation aggOp) { - int newLength = getNewLengthCombinedAttributes(attributes, aggOp); - Attribute[] combinedAttributes = new Attribute[newLength]; - - if (attributes != null) { - System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); - } - - if (aggOp != null) { - Attribute a2 = aggOp.getAttribute(); - combinedAttributes[newLength - 1] = a2; - } - return combinedAttributes; - } - - /** - * Returns a number for the new array size. The new array is the combination - * of input array of attributes and the input aggregation operation. - * - * @param attributes - * @param aggOp - * @return the size for the new array - */ - private static int getNewLengthCombinedAttributes(Attribute[] attributes, - AggregationOperation aggOp) { - int oldLength = getAttributesLength(attributes); - int aggLength = getAppOpLength(aggOp); - return oldLength + aggLength; - } - - private static int getAppOpLength(AggregationOperation aggOp) { - if (aggOp != null) { - return 1; - } - return 0; - } - - private static int getAttributesLength(Attribute[] attributes) { - if (attributes != null) { - return attributes.length; - } - return 0; - } - - /** - * checks if an application has finished - * - * @param te - * @return true if application has finished else false - */ - public static boolean isApplicationFinished(TimelineEntity te) { - SortedSet allEvents = te.getEvents(); - if ((allEvents != null) && (allEvents.size() > 0)) { - TimelineEvent event = allEvents.last(); - if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { - return true; - } - } - return false; - } - - /** - * get the time at which an app finished - * - * @param te - * @return true if application has finished else false - */ - public static long getApplicationFinishedTime(TimelineEntity te) { - SortedSet allEvents = te.getEvents(); - if ((allEvents != null) && (allEvents.size() > 0)) { - TimelineEvent event = allEvents.last(); - if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { - return event.getTimestamp(); - } - } - return 0l; - } - - /** - * Checks if the input TimelineEntity object is an ApplicationEntity. - * - * @param te - * @return true if input is an ApplicationEntity, false otherwise - */ - public static boolean isApplicationEntity(TimelineEntity te) { - return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); - } - - /** - * Checks for the APPLICATION_CREATED event. - * - * @param te - * @return true is application event exists, false otherwise - */ - public static boolean isApplicationCreated(TimelineEntity te) { - if (isApplicationEntity(te)) { - for (TimelineEvent event : te.getEvents()) { - if (event.getId() - .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { - return true; - } - } - } - return false; - } - - /** - * Returns the first seen aggregation operation as seen in the list of input - * tags or null otherwise - * - * @param tags - * @return AggregationOperation - */ - public static AggregationOperation getAggregationOperationFromTagsList( - List tags) { - for (AggregationOperation aggOp : AggregationOperation.values()) { - for (Tag tag : tags) { - if (tag.getType() == aggOp.getTagType()) { - return aggOp; - } - } - } - return null; - } - - /** - * Creates a {@link Tag} from the input attribute. - * - * @param attribute - * @return Tag - */ - public static Tag getTagFromAttribute(Entry attribute) { - // attribute could be either an Aggregation Operation or - // an Aggregation Dimension - // Get the Tag type from either - AggregationOperation aggOp = AggregationOperation - .getAggregationOperation(attribute.getKey()); - if (aggOp != null) { - Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); - return t; - } - - AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension - .getAggregationCompactionDimension(attribute.getKey()); - if (aggCompactDim != null) { - Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); - return t; - } - return null; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 6a534ed73..e0413c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Represents a rowkey for the entity table. @@ -90,9 +90,9 @@ public class EntityRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); - byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId)); - return Separator.QUALIFIERS.join(first, second, third); + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); + byte[] third = TimelineStorageUtils.encodeAppId(appId); + return Separator.QUALIFIERS.join(first, second, third, new byte[0]); } /** @@ -114,10 +114,11 @@ public class EntityRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); - byte[] third = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, "")); - return Separator.QUALIFIERS.join(first, second, third); + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); + byte[] third = TimelineStorageUtils.encodeAppId(appId); + byte[] fourth = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, "")); + return Separator.QUALIFIERS.join(first, second, third, fourth); } /** @@ -141,11 +142,11 @@ public class EntityRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); - byte[] third = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, - entityId)); - return Separator.QUALIFIERS.join(first, second, third); + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); + byte[] third = TimelineStorageUtils.encodeAppId(appId); + byte[] fourth = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId)); + return Separator.QUALIFIERS.join(first, second, third, fourth); } /** @@ -166,9 +167,8 @@ public class EntityRowKey { String flowId = Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); long flowRunId = - TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); - String appId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4])); + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); + String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]); String entityType = Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5])); String entityId = http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index b899e5c..38c0f3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; /** @@ -114,7 +114,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix byte[] columnQualifier = ColumnHelper.getColumnQualifier( this.columnPrefixBytes, qualifier); - Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); @@ -235,7 +235,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix byte[] columnQualifier = ColumnHelper.getColumnQualifier( this.columnPrefixBytes, qualifier); - Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, null, inputValue, combinedAttributes); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index 18ca599..f7841e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Represents a rowkey for the flow activity table. @@ -71,7 +71,7 @@ public class FlowActivityRowKey { */ public static byte[] getRowKey(String clusterId, String userId, String flowId) { - long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System .currentTimeMillis()); return getRowKey(clusterId, dayTs, userId, flowId); } @@ -90,7 +90,7 @@ public class FlowActivityRowKey { String flowId) { return Separator.QUALIFIERS.join( Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), - Bytes.toBytes(TimelineWriterUtils.invert(dayTs)), + Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), Bytes.toBytes(Separator.QUALIFIERS.encode(userId)), Bytes.toBytes(Separator.QUALIFIERS.encode(flowId))); } @@ -108,7 +108,8 @@ public class FlowActivityRowKey { String clusterId = Separator.QUALIFIERS.decode(Bytes .toString(rowKeyComponents[0])); - long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1])); + long dayTs = + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1])); String userId = Separator.QUALIFIERS.decode(Bytes .toString(rowKeyComponents[2])); String flowId = Separator.QUALIFIERS.decode(Bytes http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index ad30add..5079cc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; /** @@ -97,7 +97,7 @@ public enum FlowRunColumn implements Column { TypedBufferedMutator tableMutator, Long timestamp, Object inputValue, Attribute... attributes) throws IOException { - Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, aggOp); column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, inputValue, combinedAttributes); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index d55f510..b090bba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; /** @@ -112,7 +112,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix { byte[] columnQualifier = ColumnHelper.getColumnQualifier( this.columnPrefixBytes, qualifier); - Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); @@ -140,7 +140,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix { byte[] columnQualifier = ColumnHelper.getColumnQualifier( this.columnPrefixBytes, qualifier); - Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index f743e5e..1984157 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; public class FlowRunCoprocessor extends BaseRegionObserver { @@ -89,7 +89,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver { List tags = new ArrayList<>(); if ((attributes != null) && (attributes.size() > 0)) { for (Map.Entry attribute : attributes.entrySet()) { - Tag t = TimelineWriterUtils.getTagFromAttribute(attribute); + Tag t = TimelineStorageUtils.getTagFromAttribute(attribute); tags.add(t); } byte[] tagByteArray = Tag.fromList(tags); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index 880d481..7ed3651 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Represents a rowkey for the flow run table. @@ -70,7 +70,7 @@ public class FlowRunRowKey { userId, flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); return Separator.QUALIFIERS.join(first, second); } @@ -92,7 +92,7 @@ public class FlowRunRowKey { String flowId = Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); long flowRunId = - TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); return new FlowRunRowKey(clusterId, userId, flowId, flowRunId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 651bb3a..a537891 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Invoked via the coprocessor when a Get or a Scan is issued for flow run @@ -136,7 +136,7 @@ class FlowScanner implements RegionScanner, Closeable { // So all cells in one qualifier come one after the other before we see the // next column qualifier ByteArrayComparator comp = new ByteArrayComparator(); - byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES; + byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES; AggregationOperation currentAggOp = null; SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set alreadySeenAggDim = new HashSet<>(); @@ -163,7 +163,7 @@ class FlowScanner implements RegionScanner, Closeable { List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); // We assume that all the operations for a particular column are the same - return TimelineWriterUtils.getAggregationOperationFromTagsList(tags); + return TimelineStorageUtils.getAggregationOperationFromTagsList(tags); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1d0e934/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 3b0921b..701615e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -366,7 +366,8 @@ public class TestHBaseTimelineStorage { String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - String appName = "some app name"; + String appName = + ApplicationId.newInstance(System.currentTimeMillis(), 1).toString(); hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.stop(); @@ -592,7 +593,8 @@ public class TestHBaseTimelineStorage { byte[][] karr = (byte[][])e.getKey(); assertEquals(3, karr.length); assertEquals(eventId, Bytes.toString(karr[0])); - assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1])); + assertEquals( + TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1])); assertEquals(expKey, Bytes.toString(karr[2])); Object value = e.getValue(); // there should be only one timestamp and value @@ -667,7 +669,8 @@ public class TestHBaseTimelineStorage { String flow = "other_flow_name"; String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; - String appName = "some app name"; + String appName = + ApplicationId.newInstance(System.currentTimeMillis(), 1).toString(); byte[] startRow = EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); @@ -700,7 +703,7 @@ public class TestHBaseTimelineStorage { byte[][] karr = (byte[][])e.getKey(); assertEquals(3, karr.length); assertEquals(eventId, Bytes.toString(karr[0])); - assertEquals(TimelineWriterUtils.invert(expTs), + assertEquals(TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1])); // key must be empty assertEquals(0, karr[2].length);