Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 18298185DC for ; Thu, 13 Aug 2015 21:20:20 +0000 (UTC) Received: (qmail 95512 invoked by uid 500); 13 Aug 2015 21:19:56 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 95270 invoked by uid 500); 13 Aug 2015 21:19:56 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 91696 invoked by uid 99); 13 Aug 2015 21:19:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Aug 2015 21:19:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 28619E6B8A; Thu, 13 Aug 2015 21:19:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinodkv@apache.org To: common-commits@hadoop.apache.org Date: Thu, 13 Aug 2015 21:20:39 -0000 Message-Id: <250ab70a516842e388af1bc66e262a3b@git.apache.org> In-Reply-To: <5f8c6cadacb24439a194f9c14e3a062a@git.apache.org> References: <5f8c6cadacb24439a194f9c14e3a062a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [48/50] [abbrv] hadoop git commit: YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee) YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee) (cherry picked from commit 07433c2ad52df9e844dbd90020c277d3df844dcd) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a600a107 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a600a107 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a600a107 Branch: refs/heads/YARN-2928-new Commit: a600a1078792964015e3f5c0dcd89e038241a86f Parents: 63ce4c3 Author: Sangjin Lee Authored: Fri Aug 7 10:00:22 2015 -0700 Committer: Vinod Kumar Vavilapalli Committed: Thu Aug 13 13:53:37 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 6 + .../records/timelineservice/TimelineEntity.java | 9 +- .../storage/FileSystemTimelineReaderImpl.java | 164 +++---- .../storage/HBaseTimelineReaderImpl.java | 424 +++++++++++++++++++ .../storage/HBaseTimelineWriterImpl.java | 43 +- .../storage/TimelineSchemaCreator.java | 12 + .../storage/apptoflow/AppToFlowColumn.java | 126 ++++++ .../apptoflow/AppToFlowColumnFamily.java | 51 +++ .../storage/apptoflow/AppToFlowRowKey.java | 39 ++ .../storage/apptoflow/AppToFlowTable.java | 110 +++++ .../storage/apptoflow/package-info.java | 23 + .../storage/common/BaseTable.java | 16 + .../storage/common/ColumnPrefix.java | 2 +- .../common/TimelineEntitySchemaConstants.java | 68 --- .../common/TimelineHBaseSchemaConstants.java | 68 +++ .../storage/common/TimelineReaderUtils.java | 112 +++++ .../storage/entity/EntityColumn.java | 2 +- .../storage/entity/EntityColumnFamily.java | 2 +- .../storage/entity/EntityColumnPrefix.java | 2 +- .../storage/entity/EntityRowKey.java | 36 +- .../storage/entity/EntityTable.java | 8 +- .../storage/TestHBaseTimelineWriterImpl.java | 82 +++- 23 files changed, 1198 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6d7d862..2e26ead 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3949. Ensure timely flush of timeline writes. (Sangjin Lee via junping_du) + YARN-3049. [Storage Implementation] Implement storage reader interface to + fetch raw data from HBase backend (Zhijie Shen via sjlee) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index d25d1d9..5583cd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -506,4 +506,10 @@ + + + + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index 9ef2d90..0701001 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -29,7 +29,9 @@ import javax.xml.bind.annotation.XmlRootElement; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; +import java.util.TreeSet; /** * The basic timeline entity data structure for timeline service v2. Timeline @@ -133,7 +135,8 @@ public class TimelineEntity implements Comparable { private HashMap info = new HashMap<>(); private HashMap configs = new HashMap<>(); private Set metrics = new HashSet<>(); - private Set events = new HashSet<>(); + // events should be sorted by timestamp in descending order + private NavigableSet events = new TreeSet<>(); private HashMap> isRelatedToEntities = new HashMap<>(); private HashMap> relatesToEntities = new HashMap<>(); private long createdTime; @@ -334,7 +337,7 @@ public class TimelineEntity implements Comparable { } @XmlElement(name = "events") - public Set getEvents() { + public NavigableSet getEvents() { if (real == null) { return events; } else { @@ -342,7 +345,7 @@ public class TimelineEntity implements Comparable { } } - public void setEvents(Set events) { + public void setEvents(NavigableSet events) { if (real == null) { this.events = events; } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index f9f1d1d..45ddd1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -119,59 +120,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService private static void fillFields(TimelineEntity finalEntity, TimelineEntity real, EnumSet fields) { if (fields.contains(Field.ALL)) { - finalEntity.setConfigs(real.getConfigs()); - finalEntity.setMetrics(real.getMetrics()); - finalEntity.setInfo(real.getInfo()); - finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); - finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); - finalEntity.setEvents(real.getEvents()); - return; + fields = EnumSet.allOf(Field.class); } for (Field field : fields) { switch(field) { - case CONFIGS: - finalEntity.setConfigs(real.getConfigs()); - break; - case METRICS: - finalEntity.setMetrics(real.getMetrics()); - break; - case INFO: - finalEntity.setInfo(real.getInfo()); - break; - case IS_RELATED_TO: - finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); - break; - case RELATES_TO: - finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); - break; - case EVENTS: - finalEntity.setEvents(real.getEvents()); - break; - default: - continue; - } - } - } - - private static boolean matchFilter(Object infoValue, Object filterValue) { - return infoValue.equals(filterValue); - } - - private static boolean matchFilters(Map entityInfo, - Map filters) { - if (entityInfo == null || entityInfo.isEmpty()) { - return false; - } - for (Map.Entry filter : filters.entrySet()) { - Object infoValue = entityInfo.get(filter.getKey()); - if (infoValue == null) { - return false; - } - if (!matchFilter(infoValue, filter.getValue())) { - return false; + case CONFIGS: + finalEntity.setConfigs(real.getConfigs()); + break; + case METRICS: + finalEntity.setMetrics(real.getMetrics()); + break; + case INFO: + finalEntity.setInfo(real.getInfo()); + break; + case IS_RELATED_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case RELATES_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case EVENTS: + finalEntity.setEvents(real.getEvents()); + break; + default: + continue; } } - return true; } private String getFlowRunPath(String userId, String clusterId, String flowId, @@ -186,10 +160,10 @@ public class FileSystemTimelineReaderImpl extends AbstractService String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" + clusterId + "/" + APP_FLOW_MAPPING_FILE; try (BufferedReader reader = - new BufferedReader(new InputStreamReader( - new FileInputStream( - appFlowMappingFile), Charset.forName("UTF-8"))); - CSVParser parser = new CSVParser(reader, csvFormat)) { + new BufferedReader(new InputStreamReader( + new FileInputStream( + appFlowMappingFile), Charset.forName("UTF-8"))); + CSVParser parser = new CSVParser(reader, csvFormat)) { for (CSVRecord record : parser.getRecords()) { if (record.size() < 4) { continue; @@ -207,36 +181,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService throw new IOException("Unable to get flow info"); } - private static boolean matchMetricFilters(Set metrics, - Set metricFilters) { - Set tempMetrics = new HashSet(); - for (TimelineMetric metric : metrics) { - tempMetrics.add(metric.getId()); - } - - for (String metricFilter : metricFilters) { - if (!tempMetrics.contains(metricFilter)) { - return false; - } - } - return true; - } - - private static boolean matchEventFilters(Set entityEvents, - Set eventFilters) { - Set tempEvents = new HashSet(); - for (TimelineEvent event : entityEvents) { - tempEvents.add(event.getId()); - } - - for (String eventFilter : eventFilters) { - if (!tempEvents.contains(eventFilter)) { - return false; - } - } - return true; - } - private static TimelineEntity createEntityToBeReturned(TimelineEntity entity, EnumSet fieldsToRetrieve) { TimelineEntity entityToBeReturned = new TimelineEntity(); @@ -254,23 +198,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService return (time >= timeBegin) && (time <= timeEnd); } - private static boolean matchRelations( - Map> entityRelations, - Map> relations) { - for (Map.Entry> relation : relations.entrySet()) { - Set ids = entityRelations.get(relation.getKey()); - if (ids == null) { - return false; - } - for (String id : relation.getValue()) { - if (!ids.contains(id)) { - return false; - } - } - } - return true; - } - private static void mergeEntities(TimelineEntity entity1, TimelineEntity entity2) { // Ideally created time wont change except in the case of issue from client. @@ -364,22 +291,22 @@ public class FileSystemTimelineReaderImpl extends AbstractService // First sort the selected entities based on created/start time. Map> sortedEntities = new TreeMap<>( - new Comparator() { - @Override - public int compare(Long l1, Long l2) { - return l2.compareTo(l1); + new Comparator() { + @Override + public int compare(Long l1, Long l2) { + return l2.compareTo(l1); + } } - } ); for (File entityFile : dir.listFiles()) { if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) { continue; } try (BufferedReader reader = - new BufferedReader( - new InputStreamReader( - new FileInputStream( - entityFile), Charset.forName("UTF-8")))) { + new BufferedReader( + new InputStreamReader( + new FileInputStream( + entityFile), Charset.forName("UTF-8")))) { TimelineEntity entity = readEntityFromFile(reader); if (!entity.getType().equals(entityType)) { continue; @@ -393,27 +320,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService continue; } if (relatesTo != null && !relatesTo.isEmpty() && - !matchRelations(entity.getRelatesToEntities(), relatesTo)) { + !TimelineReaderUtils + .matchRelations(entity.getRelatesToEntities(), relatesTo)) { continue; } if (isRelatedTo != null && !isRelatedTo.isEmpty() && - !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { + !TimelineReaderUtils + .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { continue; } if (infoFilters != null && !infoFilters.isEmpty() && - !matchFilters(entity.getInfo(), infoFilters)) { + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { continue; } if (configFilters != null && !configFilters.isEmpty() && - !matchFilters(entity.getConfigs(), configFilters)) { + !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { continue; } if (metricFilters != null && !metricFilters.isEmpty() && - !matchMetricFilters(entity.getMetrics(), metricFilters)) { + !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { continue; } if (eventFilters != null && !eventFilters.isEmpty() && - !matchEventFilters(entity.getEvents(), eventFilters)) { + !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { continue; } TimelineEntity entityToBeReturned = @@ -461,8 +393,8 @@ public class FileSystemTimelineReaderImpl extends AbstractService File entityFile = new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION); try (BufferedReader reader = - new BufferedReader(new InputStreamReader( - new FileInputStream(entityFile), Charset.forName("UTF-8")))) { + new BufferedReader(new InputStreamReader( + new FileInputStream(entityFile), Charset.forName("UTF-8")))) { TimelineEntity entity = readEntityFromFile(reader); return createEntityToBeReturned(entity, fieldsToRetrieve); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java new file mode 100644 index 0000000..5258b9c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -0,0 +1,424 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +public class HBaseTimelineReaderImpl + extends AbstractService implements TimelineReader { + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineReaderImpl.class); + private static final long DEFAULT_BEGIN_TIME = 0L; + private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + + private Configuration hbaseConf = null; + private Connection conn; + private EntityTable entityTable; + private AppToFlowTable appToFlowTable; + + public HBaseTimelineReaderImpl() { + super(HBaseTimelineReaderImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + hbaseConf = HBaseConfiguration.create(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + entityTable = new EntityTable(); + appToFlowTable = new AppToFlowTable(); + } + + @Override + protected void serviceStop() throws Exception { + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + + @Override + public TimelineEntity getEntity(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) + throws IOException { + validateParams(userId, clusterId, appId, entityType, entityId, true); + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = lookupFlowContext(clusterId, appId); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + + byte[] rowKey = EntityRowKey.getRowKey( + clusterId, userId, flowId, flowRunId, appId, entityType, entityId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return parseEntity( + entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve, + false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME, + DEFAULT_END_TIME, null, null, null, null, null, null); + } + + @Override + public Set getEntities(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) throws IOException { + validateParams(userId, clusterId, appId, entityType, null, false); + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = lookupFlowContext(clusterId, appId); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (limit == null) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + + NavigableSet entities = new TreeSet<>(); + // Scan through part of the table to find the entities belong to one app and + // one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + clusterId, userId, flowId, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan); + for (Result result : scanner) { + TimelineEntity entity = parseEntity(result, fieldsToRetrieve, + true, createdTimeBegin, createdTimeEnd, + true, modifiedTimeBegin, modifiedTimeEnd, + isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters, + metricFilters); + if (entity == null) { + continue; + } + if (entities.size() > limit) { + entities.pollLast(); + } + entities.add(entity); + } + return entities; + } + + private FlowContext lookupFlowContext(String clusterId, String appId) + throws IOException { + byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + return new FlowContext( + AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + } else { + throw new IOException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + clusterId + ", appId=" + appId); + } + } + + private static class FlowContext { + private String flowId; + private Long flowRunId; + public FlowContext(String flowId, Long flowRunId) { + this.flowId = flowId; + this.flowRunId = flowRunId; + } + } + + private static void validateParams(String userId, String clusterId, + String appId, String entityType, String entityId, boolean checkEntityId) { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (checkEntityId) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + private static TimelineEntity parseEntity( + Result result, EnumSet fieldsToRetrieve, + boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd, + boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd, + Map> isRelatedTo, Map> relatesTo, + Map infoFilters, Map configFilters, + Set eventFilters, Set metricFilters) + throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(EntityColumn.TYPE.readResult(result).toString()); + entity.setId(EntityColumn.ID.readResult(result).toString()); + + // fetch created time + entity.setCreatedTime( + ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue()); + if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + entity.setCreatedTime( + ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue()); + if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO); + if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO); + if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO); + if (checkInfo && + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG); + if (checkConfigs && !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result); + if (checkEvents && !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result); + if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + private static void readRelationship( + TimelineEntity entity, Result result, EntityColumnPrefix prefix) + throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = prefix.readResults(result); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + private static void readKeyValuePairs( + TimelineEntity entity, Result result, EntityColumnPrefix prefix) + throws IOException { + // info and configuration are of type Map + Map columns = prefix.readResults(result); + if (prefix.equals(EntityColumnPrefix.CONFIG)) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getKey().toString()); + } + } else { + entity.addInfo(columns); + } + } + + private static void readEvents(TimelineEntity entity, Result result) + throws IOException { + Map eventsMap = new HashMap<>(); + Map eventsResult = + EntityColumnPrefix.EVENT.readResults(result); + for (Map.Entry eventResult : eventsResult.entrySet()) { + Collection tokens = + Separator.VALUES.splitEncoded(eventResult.getKey()); + if (tokens.size() != 2 && tokens.size() != 3) { + throw new IOException( + "Invalid event column name: " + eventResult.getKey()); + } + Iterator idItr = tokens.iterator(); + String id = idItr.next(); + String tsStr = idItr.next(); + // TODO: timestamp is not correct via ser/des through UTF-8 string + Long ts = + TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes( + StandardCharsets.UTF_8))); + String key = Separator.VALUES.joinEncoded(id, ts.toString()); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + if (tokens.size() == 3) { + String infoKey = idItr.next(); + event.addInfo(infoKey, eventResult.getValue()); + } + } + Set eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } + + private static void readMetrics(TimelineEntity entity, Result result) + throws IOException { + NavigableMap> metricsResult = + EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); + for (Map.Entry> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 3173e87..5290415 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -33,9 +33,14 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; @@ -55,6 +60,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private Connection conn; private TypedBufferedMutator entityTable; + private TypedBufferedMutator appToFlowTable; private static final Log LOG = LogFactory .getLog(HBaseTimelineWriterImpl.class); @@ -77,6 +83,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements Configuration hbaseConf = HBaseConfiguration.create(conf); conn = ConnectionFactory.createConnection(hbaseConf); entityTable = new EntityTable().getTableMutator(hbaseConf, conn); + appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); } /** @@ -97,7 +104,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements byte[] rowKey = EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - te); + te.getType(), te.getId()); storeInfo(rowKey, te, flowVersion); storeEvents(rowKey, te.getEvents()); @@ -107,11 +114,37 @@ public class HBaseTimelineWriterImpl extends AbstractService implements EntityColumnPrefix.IS_RELATED_TO); storeRelations(rowKey, te.getRelatesToEntities(), EntityColumnPrefix.RELATES_TO); - } + if (isApplicationCreated(te)) { + onApplicationCreated( + clusterId, userId, flowName, flowVersion, flowRunId, appId, te); + } + } return putStatus; } + private static boolean isApplicationCreated(TimelineEntity te) { + if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) { + boolean isAppCreated = false; + for (TimelineEvent event : te.getEvents()) { + if (event.getId().equals( + ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + return true; + } + } + } + return false; + } + + private void onApplicationCreated(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); + AppToFlowColumn.FLOW_RUN_ID.store( + rowKey, appToFlowTable, null, flowRunId); + } + /** * Stores the Relations from the {@linkplain TimelineEntity} object */ @@ -245,6 +278,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements public void flush() throws IOException { // flush all buffered mutators entityTable.flush(); + appToFlowTable.flush(); } /** @@ -258,6 +292,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // The close API performs flushing and releases any resources held entityTable.close(); } + if (appToFlowTable != null) { + LOG.info("closing app_flow table"); + // The close API performs flushing and releases any resources held + appToFlowTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index a5cc2ab..2c3897d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; /** @@ -70,6 +71,11 @@ public class TimelineSchemaCreator { int metricsTTL = Integer.parseInt(entityTableTTLMetrics); new EntityTable().setMetricsTTL(metricsTTL, hbaseConf); } + // Grab the appToflowTableName argument + String appToflowTableName = commandLine.getOptionValue("a2f"); + if (StringUtils.isNotBlank(appToflowTableName)) { + hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName); + } createAllTables(hbaseConf); } @@ -95,6 +101,11 @@ public class TimelineSchemaCreator { o.setRequired(false); options.addOption(o); + o = new Option("a2f", "appToflowTableName", true, "app to flow table name"); + o.setArgName("appToflowTableName"); + o.setRequired(false); + options.addOption(o); + CommandLineParser parser = new PosixParser(); CommandLine commandLine = null; try { @@ -120,6 +131,7 @@ public class TimelineSchemaCreator { throw new IOException("Cannot create table since admin is null"); } new EntityTable().createTable(admin, hbaseConf); + new AppToFlowTable().createTable(admin, hbaseConf); } finally { if (conn != null) { conn.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java new file mode 100644 index 0000000..423037a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +import java.io.IOException; + +/** + * Identifies fully qualified columns for the {@link AppToFlowTable}. + */ +public enum AppToFlowColumn implements Column { + + /** + * The flow ID + */ + FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"), + + /** + * The flow run ID + */ + FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + AppToFlowColumn(ColumnFamily columnFamily, + String columnQualifier) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.column = new ColumnHelper(columnFamily); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, Long timestamp, + Object inputValue) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier Name of the column to retrieve + * @return the corresponding {@link AppToFlowColumn} or null + */ + public static final AppToFlowColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (AppToFlowColumn ec : AppToFlowColumn.values()) { + // Find a match based only on name. + if (ec.getColumnQualifier().equals(columnQualifier)) { + return ec; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param name Name of the column to retrieve + * @return the corresponding {@link AppToFlowColumn} or null if both arguments + * don't match. + */ + public static final AppToFlowColumn columnFor( + AppToFlowColumnFamily columnFamily, String name) { + + for (AppToFlowColumn ec : AppToFlowColumn.values()) { + // Find a match based column family and on name. + if (ec.columnFamily.equals(columnFamily) + && ec.getColumnQualifier().equals(name)) { + return ec; + } + } + + // Default to null + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java new file mode 100644 index 0000000..e74235f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the app_flow table column families. + */ +public enum AppToFlowColumnFamily implements ColumnFamily { + /** + * Mapping column family houses known columns such as flowId and flowRunId + */ + MAPPING("m"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value create a column family with this name. Must be lower case and + * without spaces. + */ + AppToFlowColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java new file mode 100644 index 0000000..ad4fec6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents a rowkey for the app_flow table. + */ +public class AppToFlowRowKey { + /** + * Constructs a row key prefix for the app_flow table as follows: + * {@code clusterId!AppId} + * + * @param clusterId + * @param appId + * @return byte array with the row key + */ + public static byte[] getRowKey(String clusterId, String appId) { + return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId)); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java new file mode 100644 index 0000000..2467856 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; + +import java.io.IOException; + +/** + * The app_flow table as column families mapping. Mapping stores + * appId to flowId and flowRunId mapping information + * + * Example app_flow table record: + * + *
+ * |--------------------------------------|
+ * |  Row       | Column Family           |
+ * |  key       | info                    |
+ * |--------------------------------------|
+ * | clusterId! | flowId:                 |
+ * | AppId      | foo@daily_hive_report   |
+ * |            |                         |
+ * |            | flowRunId:              |
+ * |            | 1452828720457           |
+ * |            |                         |
+ * |            |                         |
+ * |            |                         |
+ * |--------------------------------------|
+ * 
+ */ +public class AppToFlowTable extends BaseTable { + /** app_flow prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow"; + + /** config param name that specifies the app_flow table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for app_flow table name */ + private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow"; + + private static final Log LOG = LogFactory.getLog(AppToFlowTable.class); + + public AppToFlowTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table); + HColumnDescriptor mappCF = + new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes()); + mappCF.setBloomFilterType(BloomType.ROWCOL); + appToFlowTableDescp.addFamily(mappCF); + + appToFlowTableDescp + .setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(appToFlowTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java new file mode 100644 index 0000000..df7ffc1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index e8d8b5c..abba79a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -94,6 +96,20 @@ public abstract class BaseTable { } /** + * + * @param hbaseConf used to read settings that override defaults + * @param conn used to create table from + * @param get that specifies what single row you want to get from this table + * @return result of get operation + * @throws IOException + */ + public Result getResult(Configuration hbaseConf, Connection conn, Get get) + throws IOException { + Table table = conn.getTable(getTableName(hbaseConf)); + return table.get(get); + } + + /** * Get the table name for this table. * * @param hbaseConf http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index 671c824..509ff49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -64,7 +64,7 @@ public interface ColumnPrefix { public Object readResult(Result result, String qualifier) throws IOException; /** - * @param resultfrom which to read columns + * @param result from which to read columns * @return the latest values of columns in the column family with this prefix * (or all of them if the prefix value is null). * @throws IOException http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java deleted file mode 100644 index 5518a27..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * contains the constants used in the context of schema accesses for - * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * information - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class TimelineEntitySchemaConstants { - - /** - * Used to create a pre-split for tables starting with a username in the - * prefix. TODO: this may have to become a config variable (string with - * separators) so that different installations can presplit based on their own - * commonly occurring names. - */ - private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"), - Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"), - Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"), - Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), - Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), - Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), - Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"), - Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"), - Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), - Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), - Bytes.toBytes("z") }; - - /** - * The length at which keys auto-split - */ - public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4"; - - /** - * @return splits for splits where a user is a prefix. - */ - public final static byte[][] getUsernameSplits() { - byte[][] kloon = USERNAME_SPLITS.clone(); - // Deep copy. - for (int row = 0; row < USERNAME_SPLITS.length; row++) { - kloon[row] = Bytes.copy(USERNAME_SPLITS[row]); - } - return kloon; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java new file mode 100644 index 0000000..bbf498a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * contains the constants used in the context of schema accesses for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * information + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TimelineHBaseSchemaConstants { + + /** + * Used to create a pre-split for tables starting with a username in the + * prefix. TODO: this may have to become a config variable (string with + * separators) so that different installations can presplit based on their own + * commonly occurring names. + */ + private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"), + Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"), + Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"), + Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), + Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), + Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), + Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"), + Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"), + Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), + Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), + Bytes.toBytes("z") }; + + /** + * The length at which keys auto-split + */ + public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4"; + + /** + * @return splits for splits where a user is a prefix. + */ + public final static byte[][] getUsernameSplits() { + byte[][] kloon = USERNAME_SPLITS.clone(); + // Deep copy. + for (int row = 0; row < USERNAME_SPLITS.length; row++) { + kloon[row] = Bytes.copy(USERNAME_SPLITS[row]); + } + return kloon; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java new file mode 100644 index 0000000..91d7ba4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class TimelineReaderUtils { + /** + * + * @param entityRelations the relations of an entity + * @param relationFilters the relations for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchRelations( + Map> entityRelations, + Map> relationFilters) { + for (Map.Entry> relation : relationFilters.entrySet()) { + Set ids = entityRelations.get(relation.getKey()); + if (ids == null) { + return false; + } + for (String id : relation.getValue()) { + if (!ids.contains(id)) { + return false; + } + } + } + return true; + } + + /** + * + * @param map the map of key/value pairs in an entity + * @param filters the map of key/value pairs for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchFilters(Map map, + Map filters) { + for (Map.Entry filter : filters.entrySet()) { + Object value = map.get(filter.getKey()); + if (value == null) { + return false; + } + if (!value.equals(filter.getValue())) { + return false; + } + } + return true; + } + + /** + * + * @param entityEvents the set of event objects in an entity + * @param eventFilters the set of event Ids for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchEventFilters(Set entityEvents, + Set eventFilters) { + Set eventIds = new HashSet(); + for (TimelineEvent event : entityEvents) { + eventIds.add(event.getId()); + } + for (String eventFilter : eventFilters) { + if (!eventIds.contains(eventFilter)) { + return false; + } + } + return true; + } + + /** + * + * @param metrics the set of metric objects in an entity + * @param metricFilters the set of metric Ids for filtering + * @return a boolean flag to indicate if both match + */ + public static boolean matchMetricFilters(Set metrics, + Set metricFilters) { + Set metricIds = new HashSet(); + for (TimelineMetric metric : metrics) { + metricIds.add(metric.getId()); + } + + for (String metricFilter : metricFilters) { + if (!metricIds.contains(metricFilter)) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 90da966..26e7748 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -62,7 +62,7 @@ public enum EntityColumn implements Column { private final String columnQualifier; private final byte[] columnQualifierBytes; - private EntityColumn(ColumnFamily columnFamily, + EntityColumn(ColumnFamily columnFamily, String columnQualifier) { this.columnFamily = columnFamily; this.columnQualifier = columnQualifier; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java index 8a95d12..7c63727 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java @@ -53,7 +53,7 @@ public enum EntityColumnFamily implements ColumnFamily { * @param value create a column family with this name. Must be lower case and * without spaces. */ - private EntityColumnFamily(String value) { + EntityColumnFamily(String value) { // column families should be lower case and not contain any spaces. this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 8b7bc3e..58272ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -80,7 +80,7 @@ public enum EntityColumnPrefix implements ColumnPrefix { * @param columnFamily that this column is stored in. * @param columnPrefix for this column. */ - private EntityColumnPrefix(ColumnFamily columnFamily, + EntityColumnPrefix(ColumnFamily columnFamily, String columnPrefix) { column = new ColumnHelper(columnFamily); this.columnFamily = columnFamily; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 3e17ad0..9a72be0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -55,17 +55,45 @@ public class EntityRowKey { /** * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowId!flowRunId!AppId} + * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!} * * @param clusterId * @param userId * @param flowId * @param flowRunId * @param appId + * @param entityType * @return byte array with the row key prefix */ + public static byte[] getRowKeyPrefix(String clusterId, String userId, + String flowId, Long flowRunId, String appId, String entityType) { + byte[] first = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, + flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); + byte[] third = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, "")); + return Separator.QUALIFIERS.join(first, second, third); + } + + /** + * Constructs a row key for the entity table as follows: + * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @param entityType + * @param entityId + * @return byte array with the row key + */ public static byte[] getRowKey(String clusterId, String userId, - String flowId, Long flowRunId, String appId, TimelineEntity te) { + String flowId, Long flowRunId, String appId, String entityType, + String entityId) { byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, flowId)); @@ -73,8 +101,8 @@ public class EntityRowKey { // time. byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); byte[] third = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(), - te.getId())); + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, + entityId)); return Separator.QUALIFIERS.join(first, second, third); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a600a107/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index 2ae7d39..f657a14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; /** * The entity table as column families info, config and metrics. Info stores @@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti public class EntityTable extends BaseTable { /** entity prefix */ private static final String PREFIX = - YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity"; + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity"; /** config param name that specifies the entity table name */ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; @@ -146,9 +146,9 @@ public class EntityTable extends BaseTable { entityTableDescp .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", - TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); admin.createTable(entityTableDescp, - TimelineEntitySchemaConstants.getUsernameSplits()); + TimelineHBaseSchemaConstants.getUsernameSplits()); LOG.info("Status of table creation for " + table.getNameAsString() + "=" + admin.tableExists(table)); }