Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3CCB9200C10 for ; Fri, 20 Jan 2017 06:06:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3B52F160B5D; Fri, 20 Jan 2017 05:06:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 16CF4160B54 for ; Fri, 20 Jan 2017 06:06:45 +0100 (CET) Received: (qmail 78261 invoked by uid 500); 20 Jan 2017 05:06:39 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 75288 invoked by uid 99); 20 Jan 2017 05:06:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2017 05:06:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1AE01F404B; Fri, 20 Jan 2017 05:06:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Fri, 20 Jan 2017 05:06:58 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/28] hadoop git commit: YARN-5928. Move ATSv2 HBase backend code into a new module that is only dependent at runtime by yarn servers. Contributed by Haibo Chen. archived-at: Fri, 20 Jan 2017 05:06:48 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java new file mode 100644 index 0000000..0e3c8ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -0,0 +1,728 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Invoked via the coprocessor when a Get or a Scan is issued for flow run + * table. Looks through the list of cells per row, checks their tags and does + * operation on those cells as per the cell tags. Transforms reads of the stored + * metrics into calculated sums for each column Also, finds the min and max for + * start and end times in a flow run. + */ +class FlowScanner implements RegionScanner, Closeable { + + private static final Log LOG = LogFactory.getLog(FlowScanner.class); + + /** + * use a special application id to represent the flow id this is needed since + * TimestampGenerator parses the app id to generate a cell timestamp. + */ + private static final String FLOW_APP_ID = "application_00000000000_0000"; + + private final Region region; + private final InternalScanner flowRunScanner; + private final int batchSize; + private final long appFinalValueRetentionThreshold; + private RegionScanner regionScanner; + private boolean hasMore; + private byte[] currentRow; + private List availableCells = new ArrayList<>(); + private int currentIndex; + private FlowScannerOperation action = FlowScannerOperation.READ; + + FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner, + FlowScannerOperation action) { + this(env, null, internalScanner, action); + } + + FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan, + InternalScanner internalScanner, FlowScannerOperation action) { + this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch(); + // TODO initialize other scan attributes like Scan#maxResultSize + this.flowRunScanner = internalScanner; + if (internalScanner instanceof RegionScanner) { + this.regionScanner = (RegionScanner) internalScanner; + } + this.action = action; + if (env == null) { + this.appFinalValueRetentionThreshold = + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; + this.region = null; + } else { + this.region = env.getRegion(); + Configuration hbaseConf = env.getConfiguration(); + this.appFinalValueRetentionThreshold = hbaseConf.getLong( + YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); + } + if (LOG.isDebugEnabled()) { + LOG.debug(" batch size=" + batchSize); + } + } + + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() + */ + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean nextRaw(List cells) throws IOException { + return nextRaw(cells, ScannerContext.newBuilder().build()); + } + + @Override + public boolean nextRaw(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); + } + + @Override + public boolean next(List cells) throws IOException { + return next(cells, ScannerContext.newBuilder().build()); + } + + @Override + public boolean next(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); + } + + /** + * Get value converter associated with a column or a column prefix. If nothing + * matches, generic converter is returned. + * @param colQualifierBytes + * @return value converter implementation. + */ + private static ValueConverter getValueConverter(byte[] colQualifierBytes) { + // Iterate over all the column prefixes for flow run table and get the + // appropriate converter for the column qualifier passed if prefix matches. + for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) { + byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes(""); + if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length, + colQualifierBytes, 0, colPrefixBytes.length) == 0) { + return colPrefix.getValueConverter(); + } + } + // Iterate over all the columns for flow run table and get the + // appropriate converter for the column qualifier passed if match occurs. + for (FlowRunColumn column : FlowRunColumn.values()) { + if (Bytes.compareTo( + column.getColumnQualifierBytes(), colQualifierBytes) == 0) { + return column.getValueConverter(); + } + } + // Return generic converter if nothing matches. + return GenericConverter.getInstance(); + } + + /** + * This method loops through the cells in a given row of the + * {@link FlowRunTable}. It looks at the tags of each cell to figure out how + * to process the contents. It then calculates the sum or min or max for each + * column or returns the cell as is. + * + * @param cells + * @param scannerContext + * @return true if next row is available for the scanner, false otherwise + * @throws IOException + */ + private boolean nextInternal(List cells, ScannerContext scannerContext) + throws IOException { + Cell cell = null; + startNext(); + // Loop through all the cells in this row + // For min/max/metrics we do need to scan the entire set of cells to get the + // right one + // But with flush/compaction, the number of cells being scanned will go down + // cells are grouped per column qualifier then sorted by cell timestamp + // (latest to oldest) per column qualifier + // So all cells in one qualifier come one after the other before we see the + // next column qualifier + ByteArrayComparator comp = new ByteArrayComparator(); + byte[] previousColumnQualifier = Separator.EMPTY_BYTES; + AggregationOperation currentAggOp = null; + SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); + Set alreadySeenAggDim = new HashSet<>(); + int addedCnt = 0; + long currentTimestamp = System.currentTimeMillis(); + ValueConverter converter = null; + int limit = batchSize; + + while (limit <= 0 || addedCnt < limit) { + cell = peekAtNextCell(scannerContext); + if (cell == null) { + break; + } + byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell); + if (previousColumnQualifier == null) { + // first time in loop + previousColumnQualifier = currentColumnQualifier; + } + + converter = getValueConverter(currentColumnQualifier); + if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + converter, currentTimestamp); + resetState(currentColumnCells, alreadySeenAggDim); + previousColumnQualifier = currentColumnQualifier; + currentAggOp = getCurrentAggOp(cell); + converter = getValueConverter(currentColumnQualifier); + } + collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, + converter, scannerContext); + nextCell(scannerContext); + } + if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter, + currentTimestamp); + if (LOG.isDebugEnabled()) { + if (addedCnt > 0) { + LOG.debug("emitted cells. " + addedCnt + " for " + this.action + + " rowKey=" + + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0)))); + } else { + LOG.debug("emitted no cells for " + this.action); + } + } + } + return hasMore(); + } + + private AggregationOperation getCurrentAggOp(Cell cell) { + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + // We assume that all the operations for a particular column are the same + return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags); + } + + /** + * resets the parameters to an initialized state for next loop iteration. + * + * @param cell + * @param currentAggOp + * @param currentColumnCells + * @param alreadySeenAggDim + * @param collectedButNotEmitted + */ + private void resetState(SortedSet currentColumnCells, + Set alreadySeenAggDim) { + currentColumnCells.clear(); + alreadySeenAggDim.clear(); + } + + private void collectCells(SortedSet currentColumnCells, + AggregationOperation currentAggOp, Cell cell, + Set alreadySeenAggDim, ValueConverter converter, + ScannerContext scannerContext) throws IOException { + + if (currentAggOp == null) { + // not a min/max/metric cell, so just return it as is + currentColumnCells.add(cell); + return; + } + + switch (currentAggOp) { + case GLOBAL_MIN: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMinCell = currentColumnCells.first(); + Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, + (NumericValueConverter) converter); + if (!currentMinCell.equals(newMinCell)) { + currentColumnCells.remove(currentMinCell); + currentColumnCells.add(newMinCell); + } + } + break; + case GLOBAL_MAX: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMaxCell = currentColumnCells.first(); + Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, + (NumericValueConverter) converter); + if (!currentMaxCell.equals(newMaxCell)) { + currentColumnCells.remove(currentMaxCell); + currentColumnCells.add(newMaxCell); + } + } + break; + case SUM: + case SUM_FINAL: + if (LOG.isTraceEnabled()) { + LOG.trace("In collect cells " + + " FlowSannerOperation=" + + this.action + + " currentAggOp=" + + currentAggOp + + " cell qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + + " cell value= " + + converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp()); + } + + // only if this app has not been seen yet, add to current column cells + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + String aggDim = HBaseTimelineStorageUtils + .getAggregationCompactionDimension(tags); + if (!alreadySeenAggDim.contains(aggDim)) { + // if this agg dimension has already been seen, + // since they show up in sorted order + // we drop the rest which are older + // in other words, this cell is older than previously seen cells + // for that agg dim + // but when this agg dim is not seen, + // consider this cell in our working set + currentColumnCells.add(cell); + alreadySeenAggDim.add(aggDim); + } + break; + default: + break; + } // end of switch case + } + + /* + * Processes the cells in input param currentColumnCells and populates + * List cells as the output based on the input AggregationOperation + * parameter. + */ + private int emitCells(List cells, SortedSet currentColumnCells, + AggregationOperation currentAggOp, ValueConverter converter, + long currentTimestamp) throws IOException { + if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { + return 0; + } + if (currentAggOp == null) { + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("In emitCells " + this.action + " currentColumnCells size= " + + currentColumnCells.size() + " currentAggOp" + currentAggOp); + } + + switch (currentAggOp) { + case GLOBAL_MIN: + case GLOBAL_MAX: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case SUM: + case SUM_FINAL: + switch (action) { + case FLUSH: + case MINOR_COMPACTION: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case READ: + Cell sumCell = processSummation(currentColumnCells, + (NumericValueConverter) converter); + cells.add(sumCell); + return 1; + case MAJOR_COMPACTION: + List finalCells = processSummationMajorCompaction( + currentColumnCells, (NumericValueConverter) converter, + currentTimestamp); + cells.addAll(finalCells); + return finalCells.size(); + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + } + + /* + * Returns a cell whose value is the sum of all cell values in the input set. + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + private Cell processSummation(SortedSet currentColumnCells, + NumericValueConverter converter) throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + long mostCurrentTimestamp = 0L; + Cell mostRecentCell = null; + for (Cell cell : currentColumnCells) { + currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell)); + ts = cell.getTimestamp(); + if (mostCurrentTimestamp < ts) { + mostCurrentTimestamp = ts; + mostRecentCell = cell; + } + sum = converter.add(sum, currentValue); + } + byte[] sumBytes = converter.encodeValue(sum); + Cell sumCell = + HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); + return sumCell; + } + + + /** + * Returns a list of cells that contains + * + * A) the latest cells for applications that haven't finished yet + * B) summation + * for the flow, based on applications that have completed and are older than + * a certain time + * + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + @VisibleForTesting + List processSummationMajorCompaction( + SortedSet currentColumnCells, NumericValueConverter converter, + long currentTimestamp) + throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + boolean summationDone = false; + List finalCells = new ArrayList(); + if (currentColumnCells == null) { + return finalCells; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("In processSummationMajorCompaction," + + " will drop cells older than " + currentTimestamp + + " CurrentColumnCells size=" + currentColumnCells.size()); + } + + for (Cell cell : currentColumnCells) { + AggregationOperation cellAggOp = getCurrentAggOp(cell); + // if this is the existing flow sum cell + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + String appId = HBaseTimelineStorageUtils + .getAggregationCompactionDimension(tags); + if (appId == FLOW_APP_ID) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("reading flow app id sum=" + sum); + } + } else { + currentValue = (Number) converter.decodeValue(CellUtil + .cloneValue(cell)); + // read the timestamp truncated by the generator + ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); + if ((cellAggOp == AggregationOperation.SUM_FINAL) + && ((ts + this.appFinalValueRetentionThreshold) + < currentTimestamp)) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION loop sum= " + sum + + " discarding now: " + " qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" + + converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp() + " " + this.action); + } + } else { + // not a final value but it's the latest cell for this app + // so include this cell in the list of cells to write back + finalCells.add(cell); + } + } + } + if (summationDone) { + Cell anyCell = currentColumnCells.first(); + List tags = new ArrayList(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + Cell sumCell = HBaseTimelineStorageUtils.createNewCell( + CellUtil.cloneRow(anyCell), + CellUtil.cloneFamily(anyCell), + CellUtil.cloneQualifier(anyCell), + TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), FLOW_APP_ID), + converter.encodeValue(sum), tagByteArray); + finalCells.add(sumCell); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " " + this.action); + } + LOG.info("After major compaction for qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with sum=" + sum.longValue() + + " with cell timestamp " + sumCell.getTimestamp()); + } else { + String qualifier = ""; + LOG.info("After major compaction for qualifier=" + qualifier + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with zero sum=" + + sum.longValue()); + } + return finalCells; + } + + /** + * Determines which cell is to be returned based on the values in each cell + * and the comparison operation MIN or MAX. + * + * @param previouslyChosenCell + * @param currentCell + * @param currentAggOp + * @return the cell which is the min (or max) cell + * @throws IOException + */ + private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, + AggregationOperation currentAggOp, NumericValueConverter converter) + throws IOException { + if (previouslyChosenCell == null) { + return currentCell; + } + try { + Number previouslyChosenCellValue = (Number)converter.decodeValue( + CellUtil.cloneValue(previouslyChosenCell)); + Number currentCellValue = (Number) converter.decodeValue(CellUtil + .cloneValue(currentCell)); + switch (currentAggOp) { + case GLOBAL_MIN: + if (converter.compare( + currentCellValue, previouslyChosenCellValue) < 0) { + // new value is minimum, hence return this cell + return currentCell; + } else { + // previously chosen value is miniumum, hence return previous min cell + return previouslyChosenCell; + } + case GLOBAL_MAX: + if (converter.compare( + currentCellValue, previouslyChosenCellValue) > 0) { + // new value is max, hence return this cell + return currentCell; + } else { + // previously chosen value is max, hence return previous max cell + return previouslyChosenCell; + } + default: + return currentCell; + } + } catch (IllegalArgumentException iae) { + LOG.error("caught iae during conversion to long ", iae); + return currentCell; + } + } + + @Override + public void close() throws IOException { + if (flowRunScanner != null) { + flowRunScanner.close(); + } else { + LOG.warn("scanner close called but scanner is null"); + } + } + + /** + * Called to signal the start of the next() call by the scanner. + */ + public void startNext() { + currentRow = null; + } + + /** + * Returns whether or not the underlying scanner has more rows. + */ + public boolean hasMore() { + return currentIndex < availableCells.size() ? true : hasMore; + } + + /** + * Returns the next available cell for the current row and advances the + * pointer to the next cell. This method can be called multiple times in a row + * to advance through all the available cells. + * + * @param scannerContext + * context information for the batch of cells under consideration + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell nextCell(ScannerContext scannerContext) throws IOException { + Cell cell = peekAtNextCell(scannerContext); + if (cell != null) { + currentIndex++; + } + return cell; + } + + /** + * Returns the next available cell for the current row, without advancing the + * pointer. Calling this method multiple times in a row will continue to + * return the same cell. + * + * @param scannerContext + * context information for the batch of cells under consideration + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException if any problem is encountered while grabbing the next + * cell. + */ + public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException { + if (currentIndex >= availableCells.size()) { + // done with current batch + availableCells.clear(); + currentIndex = 0; + hasMore = flowRunScanner.next(availableCells, scannerContext); + } + Cell cell = null; + if (currentIndex < availableCells.size()) { + cell = availableCells.get(currentIndex); + if (currentRow == null) { + currentRow = CellUtil.cloneRow(cell); + } else if (!CellUtil.matchingRow(cell, currentRow)) { + // moved on to the next row + // don't use the current cell + // also signal no more cells for this row + return null; + } + } + return cell; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() + */ + @Override + public long getMaxResultSize() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's scanner is not a RegionScanner"); + } + return regionScanner.getMaxResultSize(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() + */ + @Override + public long getMvccReadPoint() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.getMvccReadPoint(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() + */ + @Override + public boolean isFilterDone() throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.isFilterDone(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) + */ + @Override + public boolean reseek(byte[] bytes) throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.reseek() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.reseek(bytes); + } + + @Override + public int getBatch() { + return batchSize; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java new file mode 100644 index 0000000..73c666f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + + +/** + * Identifies the scanner operation on the {@link FlowRunTable}. + */ +public enum FlowScannerOperation { + + /** + * If the scanner is opened for reading + * during preGet or preScan. + */ + READ, + + /** + * If the scanner is opened during preFlush. + */ + FLUSH, + + /** + * If the scanner is opened during minor Compaction. + */ + MINOR_COMPACTION, + + /** + * If the scanner is opened during major Compaction. + */ + MAJOR_COMPACTION +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java new file mode 100644 index 0000000..04963f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow + * contains classes related to implementation for flow related tables, viz. flow + * run table and flow activity table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java new file mode 100644 index 0000000..d0bc366 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.server.timelineservice contains classes to be used + * across timeline reader and collector. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java new file mode 100644 index 0000000..aa2bfda --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -0,0 +1,481 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for application entities that are stored in the + * application table. + */ +class ApplicationEntityReader extends GenericEntityReader { + private static final ApplicationTable APPLICATION_TABLE = + new ApplicationTable(); + + public ApplicationEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve, true); + } + + public ApplicationEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); + } + + /** + * Uses the {@link ApplicationTable}. + */ + protected BaseTable getTable() { + return APPLICATION_TABLE; + } + + /** + * This method is called only for multiple entity reads. + */ + @Override + protected FilterList constructFilterListBasedOnFilters() throws IOException { + // Filters here cannot be null for multiple entity reads as they are set in + // augmentParams if null. + TimelineEntityFilters filters = getFilters(); + FilterList listBasedOnFilters = new FilterList(); + // Create filter list based on created time range and add it to + // listBasedOnFilters. + long createdTimeBegin = filters.getCreatedTimeBegin(); + long createdTimeEnd = filters.getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createSingleColValueFiltersByRange( + ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); + } + // Create filter list based on metric filters and add it to + // listBasedOnFilters. + TimelineFilterList metricFilters = filters.getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.METRIC, metricFilters)); + } + // Create filter list based on config filters and add it to + // listBasedOnFilters. + TimelineFilterList configFilters = filters.getConfigFilters(); + if (configFilters != null && !configFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.CONFIG, configFilters)); + } + // Create filter list based on info filters and add it to listBasedOnFilters + TimelineFilterList infoFilters = filters.getInfoFilters(); + if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter( + TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.INFO, infoFilters)); + } + return listBasedOnFilters; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of + * application table. + * + * @param list filter list to which qualifier filters have to be added. + */ + @Override + protected void updateFixedColumns(FilterList list) { + for (ApplicationColumn column : ApplicationColumn.values()) { + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryComparator(column.getColumnQualifierBytes()))); + } + } + + /** + * Creates a filter list which indicates that only some of the column + * qualifiers in the info column family will be returned in result. + * + * @return filter list. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterListForColsOfInfoFamily() + throws IOException { + FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); + // Add filters for each column in entity table. + updateFixedColumns(infoFamilyColsFilter); + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // If INFO field has to be retrieved, add a filter for fetching columns + // with INFO column prefix. + if (hasField(fieldsToRetrieve, Field.INFO)) { + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.INFO)); + } + TimelineFilterList relatesTo = getFilters().getRelatesTo(); + if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { + // If RELATES_TO field has to be retrieved, add a filter for fetching + // columns with RELATES_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO)); + } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain RELATES_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // relatesTo filters are specified. relatesTo filters will then be + // matched after fetching rows from HBase. + Set relatesToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.RELATES_TO, relatesToCols)); + } + TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + // If IS_RELATED_TO field has to be retrieved, add a filter for fetching + // columns with IS_RELATED_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); + } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain IS_RELATED_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // isRelatedTo filters are specified. isRelatedTo filters will then be + // matched after fetching rows from HBase. + Set isRelatedToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + } + TimelineFilterList eventFilters = getFilters().getEventFilters(); + if (hasField(fieldsToRetrieve, Field.EVENTS)) { + // If EVENTS field has to be retrieved, add a filter for fetching columns + // with EVENT column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.EQUAL, ApplicationColumnPrefix.EVENT)); + } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ + // Even if fields to retrieve does not contain EVENTS, we still need to + // have a filter to fetch some of the column qualifiers on the basis of + // event filters specified. Event filters will then be matched after + // fetching rows from HBase. + Set eventCols = + TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.EVENT, eventCols)); + } + return infoFamilyColsFilter; + } + + /** + * Exclude column prefixes via filters which are not required(based on fields + * to retrieve) from info column family. These filters are added to filter + * list which contains a filter for getting info column family. + * + * @param infoColFamilyList filter list for info column family. + */ + private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // Events not required. + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT)); + } + // info not required. + if (!hasField(fieldsToRetrieve, Field.INFO)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO)); + } + // is related to not required. + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); + } + // relates to not required. + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter( + CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO)); + } + } + + /** + * Updates filter list based on fields for confs and metrics to retrieve. + * + * @param listBasedOnFields filter list based on fields. + * @throws IOException if any problem occurs while updating filter list. + */ + private void updateFilterForConfsAndMetricsToRetrieve( + FilterList listBasedOnFields) throws IOException { + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // Please note that if confsToRetrieve is specified, we would have added + // CONFS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { + // Create a filter list for configs. + listBasedOnFields.addFilter(TimelineFilterUtils. + createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), + ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); + } + + // Please note that if metricsToRetrieve is specified, we would have added + // METRICS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { + // Create a filter list for metrics. + listBasedOnFields.addFilter(TimelineFilterUtils. + createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getMetricsToRetrieve(), + ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() throws IOException { + if (!needCreateFilterListBasedOnFields()) { + // Fetch all the columns. No need of a filter. + return null; + } + FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { + // We can fetch only some of the columns from info family. + infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); + } else { + // Exclude column prefixes in info column family which are not required + // based on fields to retrieve. + excludeFieldsFromInfoColFamily(infoColFamilyList); + } + listBasedOnFields.addFilter(infoColFamilyList); + + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + return listBasedOnFields; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + TimelineReaderContext context = getContext(); + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId(), context.getAppId()); + byte[] rowKey = applicationRowKey.getRowKey(); + Get get = new Get(rowKey); + get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return getTable().getResult(hbaseConf, conn, get); + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull( + getDataToRetrieve(), "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getEntityType(), + "entityType shouldn't be null"); + if (isSingleEntityRead()) { + Preconditions.checkNotNull(getContext().getAppId(), + "appId shouldn't be null"); + } else { + Preconditions.checkNotNull(getContext().getUserId(), + "userId shouldn't be null"); + Preconditions.checkNotNull(getContext().getFlowName(), + "flowName shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + TimelineReaderContext context = getContext(); + if (isSingleEntityRead()) { + // Get flow context information from AppToFlow table. + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, + hbaseConf, conn); + context.setFlowName(flowContext.getFlowName()); + context.setFlowRunId(flowContext.getFlowRunId()); + context.setUserId(flowContext.getUserId()); + } + } + // Add configs/metrics to fields to retrieve if confsToRetrieve and/or + // metricsToRetrieve are specified. + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + if (!isSingleEntityRead()) { + createFiltersIfNull(); + } + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + TimelineReaderContext context = getContext(); + // Whether or not flowRunID is null doesn't matter, the + // ApplicationRowKeyPrefix will do the right thing. + RowKeyPrefix applicationRowKeyPrefix = + new ApplicationRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), + context.getFlowRunId()); + scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(getFilters().getLimit())); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); + scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + return getTable().getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + String entityId = ApplicationColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + TimelineEntityFilters filters = getFilters(); + // fetch created time + Long createdTime = (Long) ApplicationColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime); + + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // fetch is related to entities and match isRelatedTo filter. If isRelatedTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // isRelatedTo are not set in HBase scan. + boolean checkIsRelatedTo = + !isSingleEntityRead() && filters.getIsRelatedTo() != null && + filters.getIsRelatedTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { + return null; + } + if (!hasField(fieldsToRetrieve, + Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities and match relatesTo filter. If relatesTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // relatesTo are not set in HBase scan. + boolean checkRelatesTo = + !isSingleEntityRead() && filters.getRelatesTo() != null && + filters.getRelatesTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.RELATES_TO) || + checkRelatesTo) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info if fieldsToRetrieve contains INFO or ALL. + if (hasField(fieldsToRetrieve, Field.INFO)) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); + } + + // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. + if (hasField(fieldsToRetrieve, Field.CONFIGS)) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); + } + + // fetch events and match event filters if they exist. If event filters do + // not match, entity would be dropped. We have to match filters locally + // as relevant HBase filters to filter out rows on the basis of events + // are not set in HBase scan. + boolean checkEvents = + !isSingleEntityRead() && filters.getEventFilters() != null && + filters.getEventFilters().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { + readEvents(entity, result, ApplicationColumnPrefix.EVENT); + if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics if fieldsToRetrieve contains METRICS or ALL. + if (hasField(fieldsToRetrieve, Field.METRICS)) { + readMetrics(entity, result, ApplicationColumnPrefix.METRIC); + } + return entity; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java new file mode 100644 index 0000000..9ba5e38 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow activity entities that are stored in the + * flow activity table. + */ +class FlowActivityEntityReader extends TimelineEntityReader { + private static final FlowActivityTable FLOW_ACTIVITY_TABLE = + new FlowActivityTable(); + + /** + * Used to convert Long key components to and from storage format. + */ + private final KeyConverter longKeyConverter = new LongKeyConverter(); + + + public FlowActivityEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve, true); + } + + public FlowActivityEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); + } + + /** + * Uses the {@link FlowActivityTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_ACTIVITY_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + createFiltersIfNull(); + } + + @Override + protected FilterList constructFilterListBasedOnFilters() throws IOException { + return null; + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + return null; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + throw new UnsupportedOperationException( + "we don't support a single entity query"); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + String clusterId = getContext().getClusterId(); + if (getFilters().getCreatedTimeBegin() == 0L && + getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { + // All records have to be chosen. + scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) + .getRowKeyPrefix()); + } else { + scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() + .getCreatedTimeEnd()).getRowKeyPrefix()); + scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters() + .getCreatedTimeBegin() <= 0 ? 0 + : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); + } + // use the page filter to limit the result to the page size + // the scanner may still return more than the limit; therefore we need to + // read the right number as we iterate + scan.setFilter(new PageFilter(getFilters().getLimit())); + return getTable().getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); + + Long time = rowKey.getDayTimestamp(); + String user = rowKey.getUserId(); + String flowName = rowKey.getFlowName(); + + FlowActivityEntity flowActivity = new FlowActivityEntity( + getContext().getClusterId(), time, user, flowName); + // set the id + flowActivity.setId(flowActivity.getId()); + // get the list of run ids along with the version that are associated with + // this flow on this day + Map runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter); + for (Map.Entry e : runIdsMap.entrySet()) { + Long runId = e.getKey(); + String version = (String)e.getValue(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(user); + flowRun.setName(flowName); + flowRun.setRunId(runId); + flowRun.setVersion(version); + // set the id + flowRun.setId(flowRun.getId()); + flowActivity.addFlowRun(flowRun); + } + + return flowActivity; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a925cb8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java new file mode 100644 index 0000000..986a28f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.apache.hadoop.yarn.webapp.BadRequestException; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow run entities that are stored in the flow run + * table. + */ +class FlowRunEntityReader extends TimelineEntityReader { + private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); + + public FlowRunEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve, true); + } + + public FlowRunEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); + } + + /** + * Uses the {@link FlowRunTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_RUN_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getUserId(), + "userId shouldn't be null"); + Preconditions.checkNotNull(getContext().getFlowName(), + "flowName shouldn't be null"); + if (isSingleEntityRead()) { + Preconditions.checkNotNull(getContext().getFlowRunId(), + "flowRunId shouldn't be null"); + } + EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + if (!isSingleEntityRead() && fieldsToRetrieve != null) { + for (Field field : fieldsToRetrieve) { + if (field != Field.ALL && field != Field.METRICS) { + throw new BadRequestException("Invalid field " + field + + " specified while querying flow runs."); + } + } + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) { + // Add metrics to fields to retrieve if metricsToRetrieve is specified. + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + if (!isSingleEntityRead()) { + createFiltersIfNull(); + } + } + + protected FilterList constructFilterListBasedOnFilters() throws IOException { + FilterList listBasedOnFilters = new FilterList(); + // Filter based on created time range. + Long createdTimeBegin = getFilters().getCreatedTimeBegin(); + Long createdTimeEnd = getFilters().getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME, + createdTimeBegin, createdTimeEnd)); + } + // Filter based on metric filters. + TimelineFilterList metricFilters = getFilters().getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricFilters)); + } + return listBasedOnFilters; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of flow + * run table. + * + * @return filter list to which qualifier filters have been added. + */ + private FilterList updateFixedColumns() { + FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE); + for (FlowRunColumn column : FlowRunColumn.values()) { + columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryComparator(column.getColumnQualifierBytes()))); + } + return columnsList; + } + + @Override + protected FilterList constructFilterListBasedOnFields() throws IOException { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + FlowRunColumnFamily.INFO.getBytes())); + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // If multiple entities have to be retrieved, check if metrics have to be + // retrieved and if not, add a filter so that metrics can be excluded. + // Metrics are always returned if we are reading a single entity. + if (!isSingleEntityRead() + && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { + FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC + .getColumnPrefixBytes("")))); + list.addFilter(infoColFamilyList); + } else { + // Check if metricsToRetrieve are specified and if they are, create a + // filter list for info column family by adding flow run tables columns + // and a list for metrics to retrieve. Pls note that fieldsToRetrieve + // will have METRICS added to it if metricsToRetrieve are specified + // (in augmentParams()). + TimelineFilterList metricsToRetrieve = + dataToRetrieve.getMetricsToRetrieve(); + if (metricsToRetrieve != null + && !metricsToRetrieve.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + FilterList columnsList = updateFixedColumns(); + columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + infoColFamilyList.addFilter(columnsList); + list.addFilter(infoColFamilyList); + } + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + TimelineReaderContext context = getContext(); + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId()); + byte[] rowKey = flowRunRowKey.getRowKey(); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return getTable().getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + Scan scan = new Scan(); + TimelineReaderContext context = getContext(); + RowKeyPrefix flowRunRowKeyPrefix = + new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName()); + scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(getFilters().getLimit())); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); + scan.setMaxVersions(Integer.MAX_VALUE); + return getTable().getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + TimelineReaderContext context = getContext(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(context.getUserId()); + flowRun.setName(context.getFlowName()); + if (isSingleEntityRead()) { + flowRun.setRunId(context.getFlowRunId()); + } else { + FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); + flowRun.setRunId(rowKey.getFlowRunId()); + } + + // read the start time + Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result); + if (startTime != null) { + flowRun.setStartTime(startTime.longValue()); + } + + // read the end time if available + Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result); + if (endTime != null) { + flowRun.setMaxEndTime(endTime.longValue()); + } + + // read the flow version + String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result); + if (version != null) { + flowRun.setVersion(version); + } + + // read metrics if its a single entity query or if METRICS are part of + // fieldsToRetrieve. + if (isSingleEntityRead() + || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + } + + // set the id + flowRun.setId(flowRun.getId()); + return flowRun; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org