Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1EB70200B42 for ; Sun, 10 Jul 2016 17:50:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1D720160A81; Sun, 10 Jul 2016 15:50:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C1D94160A7C for ; Sun, 10 Jul 2016 17:50:56 +0200 (CEST) Received: (qmail 39231 invoked by uid 500); 10 Jul 2016 15:50:53 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 38018 invoked by uid 99); 10 Jul 2016 15:50:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Jul 2016 15:50:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 88D83ED496; Sun, 10 Jul 2016 15:50:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Sun, 10 Jul 2016 15:51:00 -0000 Message-Id: <20994911844f4ef68fec31612cead7c3@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/50] [abbrv] hadoop git commit: YARN-5045. hbase unit tests fail due to dependency issues. (Sangjin Lee via varunsaxena) archived-at: Sun, 10 Jul 2016 15:50:58 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc6f978c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java deleted file mode 100644 index e7e7ba4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ /dev/null @@ -1,671 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNotEquals; - -import java.io.IOException; -import java.util.Map; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.ArrayList; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; - -/** - * Tests the FlowRun and FlowActivity Tables - */ -public class TestHBaseStorageFlowRunCompaction { - - private static HBaseTestingUtility util; - - private final String metric1 = "MAP_SLOT_MILLIS"; - private final String metric2 = "HDFS_BYTES_READ"; - - private final byte[] aRowKey = Bytes.toBytes("a"); - private final byte[] aFamily = Bytes.toBytes("family"); - private final byte[] aQualifier = Bytes.toBytes("qualifier"); - - @BeforeClass - public static void setupBeforeClass() throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - conf.setInt("hfile.format.version", 3); - util.startMiniCluster(); - createSchema(); - } - - private static void createSchema() throws IOException { - TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); - } - - /** writes non numeric data into flow run table - * reads it back - * - * @throws Exception - */ - @Test - public void testWriteNonNumericData() throws Exception { - String rowKey = "nonNumericRowKey"; - String column = "nonNumericColumnName"; - String value = "nonNumericValue"; - byte[] rowKeyBytes = Bytes.toBytes(rowKey); - byte[] columnNameBytes = Bytes.toBytes(column); - byte[] valueBytes = Bytes.toBytes(value); - Put p = new Put(rowKeyBytes); - p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, - valueBytes); - Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); - Connection conn = null; - conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); - flowRunTable.put(p); - - Get g = new Get(rowKeyBytes); - Result r = flowRunTable.get(g); - assertNotNull(r); - assertTrue(r.size() >= 1); - Cell actualValue = r.getColumnLatestCell( - FlowRunColumnFamily.INFO.getBytes(), columnNameBytes); - assertNotNull(CellUtil.cloneValue(actualValue)); - assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value); - } - - @Test - public void testWriteFlowRunCompaction() throws Exception { - String cluster = "kompaction_cluster1"; - String user = "kompaction_FlowRun__user1"; - String flow = "kompaction_flowRun_flow_name"; - String flowVersion = "AF1021C19F1351"; - long runid = 1449526652000L; - - int start = 10; - int count = 2000; - int appIdSuffix = 1; - HBaseTimelineWriterImpl hbi = null; - long insertTs = System.currentTimeMillis() - count; - Configuration c1 = util.getConfiguration(); - TimelineEntities te1 = null; - TimelineEntity entityApp1 = null; - try { - hbi = new HBaseTimelineWriterImpl(c1); - hbi.init(c1); - // now insert count * ( 100 + 100) metrics - // each call to getEntityMetricsApp1 brings back 100 values - // of metric1 and 100 of metric2 - for (int i = start; i < start + count; i++) { - String appName = "application_10240000000000_" + appIdSuffix; - insertTs++; - te1 = new TimelineEntities(); - entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); - te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); - - appName = "application_2048000000000_7" + appIdSuffix; - insertTs++; - te1 = new TimelineEntities(); - entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); - te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); - } - } finally { - String appName = "application_10240000000000_" + appIdSuffix; - te1 = new TimelineEntities(); - entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete( - insertTs + 1, c1); - te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); - hbi.flush(); - hbi.close(); - } - - // check in flow run table - HRegionServer server = util.getRSForFirstRegionInTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - assertTrue("Didn't find any regions for primary table!", regions.size() > 0); - // flush and compact all the regions of the primary table - for (HRegion region : regions) { - region.flushcache(); - region.compactStores(true); - } - - // check flow run for one flow many apps - checkFlowRunTable(cluster, user, flow, runid, c1, 4); - } - - - private void checkFlowRunTable(String cluster, String user, String flow, - long runid, Configuration c1, int valueCount) throws IOException { - Scan s = new Scan(); - s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); - s.setStartRow(startRow); - String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); - s.setStopRow(stopRow); - Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - ResultScanner scanner = table1.getScanner(s); - - int rowCount = 0; - for (Result result : scanner) { - assertNotNull(result); - assertTrue(!result.isEmpty()); - Map values = result.getFamilyMap(FlowRunColumnFamily.INFO - .getBytes()); - assertEquals(valueCount, values.size()); - - rowCount++; - // check metric1 - byte[] q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); - assertTrue(values.containsKey(q)); - assertEquals(141, Bytes.toLong(values.get(q))); - - // check metric2 - q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); - assertTrue(values.containsKey(q)); - assertEquals(57, Bytes.toLong(values.get(q))); - } - assertEquals(1, rowCount); - } - - - private FlowScanner getFlowScannerForTestingCompaction() { - // create a FlowScanner object with the sole purpose of invoking a process - // summation; - CompactionRequest request = new CompactionRequest(); - request.setIsMajor(true, true); - // okay to pass in nulls for the constructor arguments - // because all we want to do is invoke the process summation - FlowScanner fs = new FlowScanner(null, -1, null, - (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION - : FlowScannerOperation.MINOR_COMPACTION)); - assertNotNull(fs); - return fs; - } - - @Test - public void checkProcessSummationMoreCellsSumFinal2() - throws IOException { - long cellValue1 = 1236L; - long cellValue2 = 28L; - long cellValue3 = 1236L; - long cellValue4 = 1236L; - FlowScanner fs = getFlowScannerForTestingCompaction(); - - // note down the current timestamp - long currentTimestamp = System.currentTimeMillis(); - long cell1Ts = 1200120L; - long cell2Ts = TimestampGenerator.getSupplementedTimestamp( - System.currentTimeMillis(),"application_123746661110_11202"); - long cell3Ts = 1277719L; - long cell4Ts = currentTimestamp - 10; - - SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - - List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - "application_1234588888_91188"); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - // create a cell with a VERY old timestamp and attribute SUM_FINAL - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); - currentColumnCells.add(c1); - - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - "application_12700000001_29102"); - tags.add(t); - tagByteArray = Tag.fromList(tags); - // create a cell with a recent timestamp and attribute SUM_FINAL - Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); - currentColumnCells.add(c2); - - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), - "application_191780000000001_8195"); - tags.add(t); - tagByteArray = Tag.fromList(tags); - // create a cell with a VERY old timestamp but has attribute SUM - Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); - currentColumnCells.add(c3); - - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), - "application_191780000000001_98104"); - tags.add(t); - tagByteArray = Tag.fromList(tags); - // create a cell with a VERY old timestamp but has attribute SUM - Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); - currentColumnCells.add(c4); - - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - - // we should be getting back 4 cells - // one is the flow sum cell - // two are the cells with SUM attribute - // one cell with SUM_FINAL - assertEquals(4, cells.size()); - - for (int i = 0; i < cells.size(); i++) { - Cell returnedCell = cells.get(0); - assertNotNull(returnedCell); - - long returnTs = returnedCell.getTimestamp(); - long returnValue = Bytes.toLong(CellUtil - .cloneValue(returnedCell)); - if (returnValue == cellValue2) { - assertTrue(returnTs == cell2Ts); - } else if (returnValue == cellValue3) { - assertTrue(returnTs == cell3Ts); - } else if (returnValue == cellValue4) { - assertTrue(returnTs == cell4Ts); - } else if (returnValue == cellValue1) { - assertTrue(returnTs != cell1Ts); - assertTrue(returnTs > cell1Ts); - assertTrue(returnTs >= currentTimestamp); - } else { - // raise a failure since we expect only these two values back - Assert.fail(); - } - } - } - - // tests with many cells - // of type SUM and SUM_FINAL - // all cells of SUM_FINAL will expire - @Test - public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { - FlowScanner fs = getFlowScannerForTestingCompaction(); - int count = 200000; - - long cellValueFinal = 1000L; - long cellValueNotFinal = 28L; - - // note down the current timestamp - long currentTimestamp = System.currentTimeMillis(); - long cellTsFinalStart = 10001120L; - long cellTsFinal = cellTsFinalStart; - long cellTsNotFinalStart = currentTimestamp - 5; - long cellTsNotFinal = cellTsNotFinalStart; - - SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - List tags = null; - Tag t = null; - Cell c1 = null; - - // insert SUM_FINAL cells - for (int i = 0; i < count; i++) { - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - "application_123450000" + i + "01_19" + i); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); - currentColumnCells.add(c1); - cellTsFinal++; - } - - // add SUM cells - for (int i = 0; i < count; i++) { - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), - "application_1987650000" + i + "83_911" + i); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - // create a cell with attribute SUM - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); - currentColumnCells.add(c1); - cellTsNotFinal++; - } - - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - - // we should be getting back count + 1 cells - // one is the flow sum cell - // others are the cells with SUM attribute - assertEquals(count + 1, cells.size()); - - for (int i = 0; i < cells.size(); i++) { - Cell returnedCell = cells.get(0); - assertNotNull(returnedCell); - - long returnTs = returnedCell.getTimestamp(); - long returnValue = Bytes.toLong(CellUtil - .cloneValue(returnedCell)); - if (returnValue == (count * cellValueFinal)) { - assertTrue(returnTs > (cellTsFinalStart + count)); - assertTrue(returnTs >= currentTimestamp); - } else if ((returnValue >= cellValueNotFinal) - && (returnValue <= cellValueNotFinal * count)) { - assertTrue(returnTs >= cellTsNotFinalStart); - assertTrue(returnTs <= cellTsNotFinalStart * count); - } else { - // raise a failure since we expect only these values back - Assert.fail(); - } - } - } - - // tests with many cells - // of type SUM and SUM_FINAL - // NOT cells of SUM_FINAL will expire - @Test - public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOException { - FlowScanner fs = getFlowScannerForTestingCompaction(); - int countFinal = 20100; - int countNotFinal = 1000; - int countFinalNotExpire = 7009; - - long cellValueFinal = 1000L; - long cellValueNotFinal = 28L; - - // note down the current timestamp - long currentTimestamp = System.currentTimeMillis(); - long cellTsFinalStart = 10001120L; - long cellTsFinal = cellTsFinalStart; - - long cellTsFinalStartNotExpire = TimestampGenerator.getSupplementedTimestamp( - System.currentTimeMillis(), "application_10266666661166_118821"); - long cellTsFinalNotExpire = cellTsFinalStartNotExpire; - - long cellTsNotFinalStart = currentTimestamp - 5; - long cellTsNotFinal = cellTsNotFinalStart; - - SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - List tags = null; - Tag t = null; - Cell c1 = null; - - // insert SUM_FINAL cells which will expire - for (int i = 0; i < countFinal; i++) { - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - "application_123450000" + i + "01_19" + i); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); - currentColumnCells.add(c1); - cellTsFinal++; - } - - // insert SUM_FINAL cells which will NOT expire - for (int i = 0; i < countFinalNotExpire; i++) { - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - "application_123450000" + i + "01_19" + i); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); - currentColumnCells.add(c1); - cellTsFinalNotExpire++; - } - - // add SUM cells - for (int i = 0; i < countNotFinal; i++) { - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), - "application_1987650000" + i + "83_911" + i); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - // create a cell with attribute SUM - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); - currentColumnCells.add(c1); - cellTsNotFinal++; - } - - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - - // we should be getting back - // countNotFinal + countFinalNotExpire + 1 cells - // one is the flow sum cell - // count = the cells with SUM attribute - // count = the cells with SUM_FINAL attribute but not expired - assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size()); - - for (int i = 0; i < cells.size(); i++) { - Cell returnedCell = cells.get(0); - assertNotNull(returnedCell); - - long returnTs = returnedCell.getTimestamp(); - long returnValue = Bytes.toLong(CellUtil - .cloneValue(returnedCell)); - if (returnValue == (countFinal * cellValueFinal)) { - assertTrue(returnTs > (cellTsFinalStart + countFinal)); - assertTrue(returnTs >= currentTimestamp); - } else if (returnValue == cellValueNotFinal) { - assertTrue(returnTs >= cellTsNotFinalStart); - assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal); - } else if (returnValue == cellValueFinal){ - assertTrue(returnTs >= cellTsFinalStartNotExpire); - assertTrue(returnTs <= cellTsFinalStartNotExpire + countFinalNotExpire); - } else { - // raise a failure since we expect only these values back - Assert.fail(); - } - } - } - - @Test - public void testProcessSummationMoreCellsSumFinal() throws IOException { - FlowScanner fs = getFlowScannerForTestingCompaction(); - // note down the current timestamp - long currentTimestamp = System.currentTimeMillis(); - long cellValue1 = 1236L; - long cellValue2 = 28L; - - List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - "application_1234588888_999888"); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - - // create a cell with a VERY old timestamp and attribute SUM_FINAL - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - 120L, Bytes.toBytes(cellValue1), tagByteArray); - currentColumnCells.add(c1); - - tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), - "application_100000000001_119101"); - tags.add(t); - tagByteArray = Tag.fromList(tags); - - // create a cell with a VERY old timestamp but has attribute SUM - Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - 130L, Bytes.toBytes(cellValue2), tagByteArray); - currentColumnCells.add(c2); - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - - // we should be getting back two cells - // one is the flow sum cell - // another is the cell with SUM attribute - assertEquals(2, cells.size()); - - Cell returnedCell = cells.get(0); - assertNotNull(returnedCell); - long inputTs1 = c1.getTimestamp(); - long inputTs2 = c2.getTimestamp(); - - long returnTs = returnedCell.getTimestamp(); - long returnValue = Bytes.toLong(CellUtil - .cloneValue(returnedCell)); - // the returned Ts will be far greater than input ts as well as the noted - // current timestamp - if (returnValue == cellValue2) { - assertTrue(returnTs == inputTs2); - } else if (returnValue == cellValue1) { - assertTrue(returnTs >= currentTimestamp); - assertTrue(returnTs != inputTs1); - } else { - // raise a failure since we expect only these two values back - Assert.fail(); - } - } - - @Test - public void testProcessSummationOneCellSumFinal() throws IOException { - FlowScanner fs = getFlowScannerForTestingCompaction(); - - // note down the current timestamp - long currentTimestamp = System.currentTimeMillis(); - List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), - "application_123458888888_999888"); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - - // create a cell with a VERY old timestamp - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - 120L, Bytes.toBytes(1110L), tagByteArray); - currentColumnCells.add(c1); - - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - // we should not get the same cell back - // but we get back the flow cell - assertEquals(1, cells.size()); - - Cell returnedCell = cells.get(0); - // it's NOT the same cell - assertNotEquals(c1, returnedCell); - long inputTs = c1.getTimestamp(); - long returnTs = returnedCell.getTimestamp(); - // the returned Ts will be far greater than input ts as well as the noted - // current timestamp - assertTrue(returnTs > inputTs); - assertTrue(returnTs >= currentTimestamp); - } - - @Test - public void testProcessSummationOneCell() throws IOException { - FlowScanner fs = getFlowScannerForTestingCompaction(); - - // note down the current timestamp - long currentTimestamp = System.currentTimeMillis(); - - // try for 1 cell with tag SUM - List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM.getTagType(), - "application_123458888888_999888"); - tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); - - SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, - currentTimestamp, Bytes.toBytes(1110L), tagByteArray); - currentColumnCells.add(c1); - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - // we expect the same cell back - assertEquals(1, cells.size()); - Cell c2 = cells.get(0); - assertEquals(c1, c2); - assertEquals(currentTimestamp, c2.getTimestamp()); - } - - @Test - public void testProcessSummationEmpty() throws IOException { - FlowScanner fs = getFlowScannerForTestingCompaction(); - long currentTimestamp = System.currentTimeMillis(); - - SortedSet currentColumnCells = null; - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - assertEquals(0, cells.size()); - - currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); - assertNotNull(cells); - assertEquals(0, cells.size()); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - util.shutdownMiniCluster(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc6f978c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml index 6a77c0c..6e1f461 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml @@ -44,5 +44,6 @@ hadoop-yarn-server-applicationhistoryservice hadoop-yarn-server-timeline-pluginstorage hadoop-yarn-server-timelineservice + hadoop-yarn-server-timelineservice-hbase-tests --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org