Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4A47718E66 for ; Thu, 23 Jul 2015 23:20:35 +0000 (UTC) Received: (qmail 26072 invoked by uid 500); 23 Jul 2015 23:20:28 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 26010 invoked by uid 500); 23 Jul 2015 23:20:28 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 25990 invoked by uid 99); 23 Jul 2015 23:20:28 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jul 2015 23:20:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 33E9AC0708 for ; Thu, 23 Jul 2015 23:20:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.79 X-Spam-Level: * X-Spam-Status: No, score=1.79 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ya-1m5gZ5ObK for ; Thu, 23 Jul 2015 23:20:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 758D926426 for ; Thu, 23 Jul 2015 23:20:10 +0000 (UTC) Received: (qmail 25745 invoked by uid 99); 23 Jul 2015 23:20:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jul 2015 23:20:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2B1C3E6842; Thu, 23 Jul 2015 23:20:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.incubator.apache.org Date: Thu, 23 Jul 2015 23:20:31 -0000 Message-Id: <141938c1a19844bebc0c9ec81b43e535@git.apache.org> In-Reply-To: <9271c088b1be42f994d443b5b1ff7d4c@git.apache.org> References: <9271c088b1be42f994d443b5b1ff7d4c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project. http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java new file mode 100644 index 0000000..0b812a6 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.inmemcubing; + +import static org.junit.Assert.*; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase { + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class); + + private static final int INPUT_ROWS = 10000; + private static final int SPLIT_ROWS = 5000; + private static final int THREADS = 4; + + private static CubeInstance cube; + private static String flatTable; + private static Map> dictionaryMap; + + @BeforeClass + public static void before() throws IOException { + staticCreateTestMetadata(); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeManager = CubeManager.getInstance(kylinConfig); + + cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); + flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv"; + dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable); + } + + @AfterClass + public static void after() throws Exception { + staticCleanupTestMetadata(); + } + + @Test + public void test() throws Exception { + + ArrayBlockingQueue> queue = new ArrayBlockingQueue>(1000); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + long randSeed = System.currentTimeMillis(); + + DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); + doggedBuilder.setConcurrentThreads(THREADS); + doggedBuilder.setSplitRowThreshold(SPLIT_ROWS); + FileRecordWriter doggedResult = new FileRecordWriter(); + + { + Future future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult)); + InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); + future.get(); + doggedResult.close(); + } + + InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); + inmemBuilder.setConcurrentThreads(THREADS); + FileRecordWriter inmemResult = new FileRecordWriter(); + + { + Future future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult)); + InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); + future.get(); + inmemResult.close(); + } + + fileCompare(doggedResult.file, inmemResult.file); + doggedResult.file.delete(); + inmemResult.file.delete(); + } + + private void fileCompare(File file, File file2) throws IOException { + BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8")); + BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8")); + + String line1, line2; + do { + line1 = r1.readLine(); + line2 = r2.readLine(); + + assertEquals(line1, line2); + + } while (line1 != null || line2 != null); + + r1.close(); + r2.close(); + } + + class FileRecordWriter implements ICuboidWriter { + + File file; + PrintWriter writer; + + FileRecordWriter() throws IOException { + file = File.createTempFile("DoggedCubeBuilderTest_", ".data"); + writer = new PrintWriter(file, "UTF-8"); + } + + @Override + public void write(long cuboidId, GTRecord record) throws IOException { + writer.print(cuboidId); + writer.print(", "); + writer.print(record.toString()); + writer.println(); + } + + public void close() { + writer.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java new file mode 100644 index 0000000..1487dff --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.inmemcubing; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.io.FileUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + */ +public class InMemCubeBuilderTest extends LocalFileMetadataTestCase { + + private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class); + + private static final int INPUT_ROWS = 70000; + private static final int THREADS = 4; + + private static CubeInstance cube; + private static String flatTable; + private static Map> dictionaryMap; + + @BeforeClass + public static void before() throws IOException { + staticCreateTestMetadata(); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeManager = CubeManager.getInstance(kylinConfig); + + cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); + flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv"; + dictionaryMap = getDictionaryMap(cube, flatTable); + } + + @AfterClass + public static void after() throws Exception { + staticCleanupTestMetadata(); + } + + @Test + public void test() throws Exception { + + InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); + //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); + cubeBuilder.setConcurrentThreads(THREADS); + + ArrayBlockingQueue> queue = new ArrayBlockingQueue>(1000); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + try { + // round 1 + { + Future future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); + feedData(cube, flatTable, queue, INPUT_ROWS); + future.get(); + } + + // round 2, zero input + { + Future future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); + feedData(cube, flatTable, queue, 0); + future.get(); + } + + // round 3 + { + Future future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); + feedData(cube, flatTable, queue, INPUT_ROWS); + future.get(); + } + + } catch (Exception e) { + logger.error("stream build failed", e); + throw new IOException("Failed to build cube ", e); + } + } + + static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue> queue, int count) throws IOException, InterruptedException { + feedData(cube, flatTable, queue, count, 0); + } + + static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue> queue, int count, long randSeed) throws IOException, InterruptedException { + CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null); + int nColumns = flatTableDesc.getColumnList().size(); + + @SuppressWarnings("unchecked") + Set[] distinctSets = new Set[nColumns]; + for (int i = 0; i < nColumns; i++) + distinctSets[i] = new TreeSet(); + + // get distinct values on each column + List lines = FileUtils.readLines(new File(flatTable), "UTF-8"); + for (String line : lines) { + String[] row = line.trim().split(","); + assert row.length == nColumns; + for (int i = 0; i < nColumns; i++) + distinctSets[i].add(row[i]); + } + + List distincts = new ArrayList(); + for (int i = 0; i < nColumns; i++) { + distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()])); + } + + Random rand = new Random(); + if (randSeed != 0) + rand.setSeed(randSeed); + + // output with random data + for (; count > 0; count--) { + ArrayList row = new ArrayList(nColumns); + for (int i = 0; i < nColumns; i++) { + String[] candidates = distincts.get(i); + row.add(candidates[rand.nextInt(candidates.length)]); + } + queue.put(row); + } + queue.put(new ArrayList(0)); + } + + static Map> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException { + Map> result = Maps.newHashMap(); + CubeDesc desc = cube.getDescriptor(); + CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null); + int nColumns = flatTableDesc.getColumnList().size(); + + List columns = Cuboid.getBaseCuboid(desc).getColumns(); + for (int c = 0; c < columns.size(); c++) { + TblColRef col = columns.get(c); + if (desc.getRowkey().isUseDictionary(col)) { + logger.info("Building dictionary for " + col); + List valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]); + Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList); + result.put(col, dict); + } + } + return result; + } + + private static List readValueList(String flatTable, int nColumns, int c) throws IOException { + List result = Lists.newArrayList(); + List lines = FileUtils.readLines(new File(flatTable), "UTF-8"); + for (String line : lines) { + String[] row = line.trim().split(","); + assert row.length == nColumns; + if (row[c] != null) { + result.add(Bytes.toBytes(row[c])); + } + } + return result; + } + + class ConsoleGTRecordWriter implements ICuboidWriter { + + boolean verbose = false; + + @Override + public void write(long cuboidId, GTRecord record) throws IOException { + if (verbose) + System.out.println(record.toString()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java new file mode 100644 index 0000000..645908d --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.inmemcubing; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.apache.kylin.common.util.MemoryBudgetController; +import org.apache.kylin.cube.inmemcubing.MemDiskStore; +import org.apache.kylin.gridtable.GTBuilder; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GridTable; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.UnitTestSupport; +import org.junit.Test; + +public class MemDiskStoreTest { + + final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20); + final GTInfo info = UnitTestSupport.advancedInfo(); + final List data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data + + @Test + public void testSingleThreadWriteRead() throws IOException { + long start = System.currentTimeMillis(); + verifyOneTableWriteAndRead(); + long end = System.currentTimeMillis(); + System.out.println("Cost " + (end - start) + " millis"); + } + + @Test + public void testMultiThreadWriteRead() throws IOException, InterruptedException { + long start = System.currentTimeMillis(); + + int nThreads = 5; + Thread[] t = new Thread[nThreads]; + for (int i = 0; i < nThreads; i++) { + t[i] = new Thread() { + public void run() { + try { + verifyOneTableWriteAndRead(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }; + t[i].start(); + } + for (int i = 0; i < nThreads; i++) { + t[i].join(); + } + + long end = System.currentTimeMillis(); + System.out.println("Cost " + (end - start) + " millis"); + } + + private void verifyOneTableWriteAndRead() throws IOException { + MemDiskStore store = new MemDiskStore(info, budgetCtrl); + GridTable table = new GridTable(info, store); + verifyWriteAndRead(table); + } + + private void verifyWriteAndRead(GridTable table) throws IOException { + GTInfo info = table.getInfo(); + + GTBuilder builder = table.rebuild(); + for (GTRecord r : data) { + builder.write(r); + } + builder.close(); + + IGTScanner scanner = table.scan(new GTScanRequest(info)); + int i = 0; + for (GTRecord r : scanner) { + assertEquals(data.get(i++), r); + } + scanner.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java new file mode 100644 index 0000000..f7bf432 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemoryBudgetControllerTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.inmemcubing; + +import org.apache.kylin.common.util.MemoryBudgetController; +import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException; +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.*; + +public class MemoryBudgetControllerTest { + + @Test + public void test() { + final int n = MemoryBudgetController.getSystemAvailMB() / 2; + final MemoryBudgetController mbc = new MemoryBudgetController(n); + + ArrayList mbList = new ArrayList(); + for (int i = 0; i < n; i++) { + mbList.add(new Consumer(mbc)); + assertEquals(mbList.size(), mbc.getTotalReservedMB()); + } + + // a's reservation will free up all the previous + final Consumer a = new Consumer(); + mbc.reserve(a, n); + for (int i = 0; i < n; i++) { + assertEquals(null, mbList.get(i).data); + } + + // cancel a in 2 seconds + new Thread() { + @Override + public void run() { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + mbc.reserve(a, 0); + } + }.start(); + + // b will success after some wait + long bWaitStart = System.currentTimeMillis(); + final Consumer b = new Consumer(); + mbc.reserveInsist(b, n); + assertTrue(System.currentTimeMillis() - bWaitStart > 1000); + + try { + mbc.reserve(a, 1); + fail(); + } catch (NotEnoughBudgetException ex) { + // expected + } + } + + class Consumer implements MemoryBudgetController.MemoryConsumer { + + byte[] data; + + Consumer() { + } + + Consumer(MemoryBudgetController mbc) { + mbc.reserve(this, 1); + data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data + } + + @Override + public int freeUp(int mb) { + if (data != null) { + data = null; + return 1; + } else { + return 0; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java new file mode 100644 index 0000000..f53f11f --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import java.math.BigDecimal; +import java.util.Comparator; +import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.gridtable.GTAggregateScanner; +import org.apache.kylin.metadata.measure.BigDecimalSumAggregator; +import org.apache.kylin.metadata.measure.DoubleSumAggregator; +import org.apache.kylin.metadata.measure.DoubleMutable; +import org.apache.kylin.metadata.measure.HLLCAggregator; +import org.apache.kylin.metadata.measure.LongSumAggregator; +import org.apache.kylin.metadata.measure.LongMutable; +import org.apache.kylin.metadata.measure.MeasureAggregator; +import org.junit.Test; + +public class AggregationCacheMemSizeTest { + + public static final int NUM_OF_OBJS = 1000000 / 2; + + interface CreateAnObject { + Object create(); + } + + @Test + public void testHLLCAggregatorSize() throws InterruptedException { + int est = estimateObjectSize(new CreateAnObject() { + @Override + public Object create() { + HLLCAggregator aggr = new HLLCAggregator(10); + aggr.aggregate(new HyperLogLogPlusCounter(10)); + return aggr; + } + }); + System.out.println("HLLC: " + est); + } + + @Test + public void testBigDecimalAggregatorSize() throws InterruptedException { + int est = estimateObjectSize(new CreateAnObject() { + @Override + public Object create() { + return newBigDecimalAggr(); + } + + }); + System.out.println("BigDecimal: " + est); + } + + private BigDecimalSumAggregator newBigDecimalAggr() { + BigDecimalSumAggregator aggr = new BigDecimalSumAggregator(); + aggr.aggregate(new BigDecimal("12345678901234567890.123456789")); + return aggr; + } + + @Test + public void testLongAggregatorSize() throws InterruptedException { + int est = estimateObjectSize(new CreateAnObject() { + @Override + public Object create() { + return newLongAggr(); + } + }); + System.out.println("Long: " + est); + } + + private LongSumAggregator newLongAggr() { + LongSumAggregator aggr = new LongSumAggregator(); + aggr.aggregate(new LongMutable(10)); + return aggr; + } + + @Test + public void testDoubleAggregatorSize() throws InterruptedException { + int est = estimateObjectSize(new CreateAnObject() { + @Override + public Object create() { + return newDoubleAggr(); + } + }); + System.out.println("Double: " + est); + } + + private DoubleSumAggregator newDoubleAggr() { + DoubleSumAggregator aggr = new DoubleSumAggregator(); + aggr.aggregate(new DoubleMutable(10)); + return aggr; + } + + @Test + public void testByteArraySize() throws InterruptedException { + int est = estimateObjectSize(new CreateAnObject() { + @Override + public Object create() { + return new byte[10]; + } + }); + System.out.println("byte[10]: " + est); + } + + @Test + public void testAggregatorArraySize() throws InterruptedException { + int est = estimateObjectSize(new CreateAnObject() { + @Override + public Object create() { + return new MeasureAggregator[7]; + } + }); + System.out.println("MeasureAggregator[7]: " + est); + } + + @Test + public void testTreeMapSize() throws InterruptedException { + final SortedMap map = new TreeMap(new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return Bytes.compareTo(o1, o2); + } + }); + final Random rand = new Random(); + int est = estimateObjectSize(new CreateAnObject() { + @Override + public Object create() { + byte[] key = new byte[10]; + rand.nextBytes(key); + map.put(key, null); + return null; + } + }); + System.out.println("TreeMap entry: " + (est - 20)); // -20 is to exclude byte[10] + } + + @Test + public void testAggregationCacheSize() throws InterruptedException { + final SortedMap map = new TreeMap(new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return Bytes.compareTo(o1, o2); + } + }); + final Random rand = new Random(); + + long bytesBefore = memLeft(); + byte[] key = null; + MeasureAggregator[] aggrs = null; + for (int i = 0; i < NUM_OF_OBJS; i++) { + key = new byte[10]; + rand.nextBytes(key); + aggrs = new MeasureAggregator[4]; + aggrs[0] = newBigDecimalAggr(); + aggrs[1] = newLongAggr(); + aggrs[2] = newDoubleAggr(); + aggrs[3] = newDoubleAggr(); + map.put(key, aggrs); + } + + long bytesAfter = memLeft(); + + long mapActualSize = bytesBefore - bytesAfter; + long mapExpectSize = GTAggregateScanner.estimateSizeOfAggrCache(key, aggrs, map.size()); + System.out.println("Actual cache size: " + mapActualSize); + System.out.println("Expect cache size: " + mapExpectSize); + } + + private int estimateObjectSize(CreateAnObject factory) throws InterruptedException { + Object[] hold = new Object[NUM_OF_OBJS]; + long bytesBefore = memLeft(); + + for (int i = 0; i < hold.length; i++) { + hold[i] = factory.create(); + } + + long bytesAfter = memLeft(); + return (int) ((bytesBefore - bytesAfter) / hold.length); + } + + private long memLeft() throws InterruptedException { + Runtime.getRuntime().gc(); + Thread.sleep(500); + return getSystemAvailBytes(); + } + + private long getSystemAvailBytes() { + Runtime runtime = Runtime.getRuntime(); + long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process + long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free + long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting + long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using + long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used + return availableMemory; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java new file mode 100644 index 0000000..c90c5d3 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.cube.gridtable.CubeCodeSystem; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.dict.NumberDictionaryBuilder; +import org.apache.kylin.dict.StringBytesConverter; +import org.apache.kylin.dict.TrieDictionaryBuilder; +import org.apache.kylin.gridtable.GTInfo.Builder; +import org.apache.kylin.gridtable.memstore.GTSimpleMemStore; +import org.apache.kylin.metadata.filter.ColumnTupleFilter; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.ConstantTupleFilter; +import org.apache.kylin.metadata.filter.ExtractTupleFilter; +import org.apache.kylin.metadata.filter.LogicalTupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.measure.LongMutable; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.DataType; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class DictGridTableTest { + + @Test + public void test() throws IOException { + GridTable table = newTestTable(); + verifyScanRangePlanner(table); + verifyFirstRow(table); + verifyScanWithUnevaluatableFilter(table); + verifyScanWithEvaluatableFilter(table); + verifyConvertFilterConstants1(table); + verifyConvertFilterConstants2(table); + verifyConvertFilterConstants3(table); + verifyConvertFilterConstants4(table); + } + + private void verifyScanRangePlanner(GridTable table) { + GTInfo info = table.getInfo(); + GTScanRangePlanner planner = new GTScanRangePlanner(info); + + CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13")); + CompareTupleFilter timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15")); + CompareTupleFilter timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15")); + CompareTupleFilter ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10")); + CompareTupleFilter ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20")); + CompareTupleFilter ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30")); + CompareTupleFilter ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30")); + + // flatten or-and & hbase fuzzy value + { + LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2)); + List r = planner.planScanRanges(filter); + assertEquals(1, r.size()); + assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString()); + assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString()); + } + + // pre-evaluate ever false + { + LogicalTupleFilter filter = and(timeComp1, timeComp2); + List r = planner.planScanRanges(filter); + assertEquals(0, r.size()); + } + + // pre-evaluate ever true + { + LogicalTupleFilter filter = or(timeComp1, ageComp4); + List r = planner.planScanRanges(filter); + assertEquals("[[null, null]-[null, null]]", r.toString()); + } + + // merge overlap range + { + LogicalTupleFilter filter = or(timeComp1, timeComp3); + List r = planner.planScanRanges(filter); + assertEquals("[[null, null]-[null, null]]", r.toString()); + } + + // merge too many ranges + { + LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3)); + List r = planner.planScanRanges(filter); + assertEquals(3, r.size()); + assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString()); + assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString()); + assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString()); + List r2 = planner.planScanRanges(filter, 2); + assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString()); + } + } + + private void verifyFirstRow(GridTable table) throws IOException { + doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]"); + } + + private void verifyScanWithUnevaluatableFilter(GridTable table) throws IOException { + GTInfo info = table.getInfo(); + + CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1)); + LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1))); + LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable); + + GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter); + + // note the unEvaluatable column 1 in filter is added to group by + assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); + + doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]"); + } + + private void verifyScanWithEvaluatableFilter(GridTable table) throws IOException { + GTInfo info = table.getInfo(); + + CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); + CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); + LogicalTupleFilter filter = and(fComp1, fComp2); + + GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter); + + // note the evaluatable column 1 in filter is added to returned columns but not in group by + assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); + + doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); + } + + private void verifyConvertFilterConstants1(GridTable table) { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp")); + TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer")); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString()); + } + + private void verifyConvertFilterConstants2(GridTable table) { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp")); + TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer")); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LT, "9"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + // $1<"9" round up to $1<"10" + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString()); + } + + private void verifyConvertFilterConstants3(GridTable table) { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp")); + TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer")); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.LTE, "9"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + // $1<="9" round down to FALSE + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString()); + } + + private void verifyConvertFilterConstants4(GridTable table) { + GTInfo info = table.getInfo(); + + TableDesc extTable = TableDesc.mockup("ext"); + TblColRef extColA = new TblColRef(ColumnDesc.mockup(extTable, 1, "A", "timestamp")); + TblColRef extColB = new TblColRef(ColumnDesc.mockup(extTable, 2, "B", "integer")); + + CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14"); + CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15"); + LogicalTupleFilter filter = and(fComp1, fComp2); + + List colMapping = Lists.newArrayList(); + colMapping.add(extColA); + colMapping.add(extColB); + + // $1 in ("9", "10", "15") has only "10" left + TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); + assertEquals("AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString()); + } + + private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException { + System.out.println(req); + IGTScanner scanner = table.scan(req); + int i = 0; + for (GTRecord r : scanner) { + System.out.println(r); + if (verifyRows != null && i < verifyRows.length) { + assertEquals(verifyRows[i], r.toString()); + } + i++; + } + scanner.close(); + } + + private Object enc(GTInfo info, int col, String value) { + ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength()); + info.codeSystem.encodeColumnValue(col, value, buf); + return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position()); + } + + private ExtractTupleFilter unevaluatable(TblColRef col) { + ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT); + r.addChild(new ColumnTupleFilter(col)); + return r; + } + + private CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) { + CompareTupleFilter result = new CompareTupleFilter(op); + result.addChild(new ColumnTupleFilter(col)); + result.addChild(new ConstantTupleFilter(Arrays.asList(value))); + return result; + } + + private LogicalTupleFilter and(TupleFilter... children) { + return logic(FilterOperatorEnum.AND, children); + } + + private LogicalTupleFilter or(TupleFilter... children) { + return logic(FilterOperatorEnum.OR, children); + } + + private LogicalTupleFilter not(TupleFilter child) { + return logic(FilterOperatorEnum.NOT, child); + } + + private LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) { + LogicalTupleFilter result = new LogicalTupleFilter(op); + for (TupleFilter c : children) { + result.addChild(c); + } + return result; + } + + static GridTable newTestTable() throws IOException { + GTInfo info = newInfo(); + GTSimpleMemStore store = new GTSimpleMemStore(info); + GridTable table = new GridTable(info, store); + + GTRecord r = new GTRecord(table.getInfo()); + GTBuilder builder = table.rebuild(); + + builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5"))); + builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5"))); + builder.close(); + + return table; + } + + static GTInfo newInfo() { + Builder builder = GTInfo.builder(); + builder.setCodeSystem(newDictCodeSystem()); + builder.setColumns( // + DataType.getInstance("timestamp"), // + DataType.getInstance("integer"), // + DataType.getInstance("varchar(10)"), // + DataType.getInstance("bigint"), // + DataType.getInstance("decimal") // + ); + builder.setPrimaryKey(setOf(0, 1)); + builder.setColumnPreferIndex(setOf(0)); + builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) }); + builder.enableRowBlock(4); + GTInfo info = builder.build(); + return info; + } + + @SuppressWarnings("rawtypes") + private static CubeCodeSystem newDictCodeSystem() { + Map dictionaryMap = Maps.newHashMap(); + dictionaryMap.put(1, newDictionaryOfInteger()); + dictionaryMap.put(2, newDictionaryOfString()); + return new CubeCodeSystem(dictionaryMap); + } + + @SuppressWarnings("rawtypes") + private static Dictionary newDictionaryOfString() { + TrieDictionaryBuilder builder = new TrieDictionaryBuilder<>(new StringBytesConverter()); + builder.addValue("Dong"); + builder.addValue("George"); + builder.addValue("Jason"); + builder.addValue("Kejia"); + builder.addValue("Luke"); + builder.addValue("Mahone"); + builder.addValue("Qianhao"); + builder.addValue("Shaofeng"); + builder.addValue("Xu"); + builder.addValue("Yang"); + return builder.build(0); + } + + @SuppressWarnings("rawtypes") + private static Dictionary newDictionaryOfInteger() { + NumberDictionaryBuilder builder = new NumberDictionaryBuilder<>(new StringBytesConverter()); + builder.addValue("10"); + builder.addValue("20"); + builder.addValue("30"); + builder.addValue("40"); + builder.addValue("50"); + builder.addValue("60"); + builder.addValue("70"); + builder.addValue("80"); + builder.addValue("90"); + builder.addValue("100"); + return builder.build(0); + } + + private static ImmutableBitSet setOf(int... values) { + BitSet set = new BitSet(); + for (int i : values) + set.set(i); + return new ImmutableBitSet(set); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java new file mode 100644 index 0000000..c87e970 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.BitSet; +import java.util.List; + +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.gridtable.GTBuilder; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GridTable; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.UnitTestSupport; +import org.apache.kylin.gridtable.memstore.GTSimpleMemStore; +import org.apache.kylin.metadata.measure.LongMutable; +import org.junit.Test; + +public class SimpleGridTableTest { + + @Test + public void testBasics() throws IOException { + GTInfo info = UnitTestSupport.basicInfo(); + GTSimpleMemStore store = new GTSimpleMemStore(info); + GridTable table = new GridTable(info, store); + + GTBuilder builder = rebuild(table); + IGTScanner scanner = scan(table); + assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount()); + assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount()); + } + + @Test + public void testAdvanced() throws IOException { + GTInfo info = UnitTestSupport.advancedInfo(); + GTSimpleMemStore store = new GTSimpleMemStore(info); + GridTable table = new GridTable(info, store); + + GTBuilder builder = rebuild(table); + IGTScanner scanner = scan(table); + assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount()); + assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount()); + } + + @Test + public void testAggregate() throws IOException { + GTInfo info = UnitTestSupport.advancedInfo(); + GTSimpleMemStore store = new GTSimpleMemStore(info); + GridTable table = new GridTable(info, store); + + GTBuilder builder = rebuild(table); + IGTScanner scanner = scanAndAggregate(table); + assertEquals(builder.getWrittenRowBlockCount(), scanner.getScannedRowBlockCount()); + assertEquals(builder.getWrittenRowCount(), scanner.getScannedRowCount()); + } + + @Test + public void testAppend() throws IOException { + GTInfo info = UnitTestSupport.advancedInfo(); + GTSimpleMemStore store = new GTSimpleMemStore(info); + GridTable table = new GridTable(info, store); + + rebuildViaAppend(table); + IGTScanner scanner = scan(table); + assertEquals(3, scanner.getScannedRowBlockCount()); + assertEquals(10, scanner.getScannedRowCount()); + } + + private IGTScanner scan(GridTable table) throws IOException { + GTScanRequest req = new GTScanRequest(table.getInfo()); + IGTScanner scanner = table.scan(req); + for (GTRecord r : scanner) { + Object[] v = r.getValues(); + assertTrue(((String) v[0]).startsWith("2015-")); + assertTrue(((String) v[2]).equals("Food")); + assertTrue(((LongMutable) v[3]).get() == 10); + assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5); + System.out.println(r); + } + scanner.close(); + System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount()); + System.out.println("Scanned Row Count: " + scanner.getScannedRowCount()); + return scanner; + } + + private IGTScanner scanAndAggregate(GridTable table) throws IOException { + GTScanRequest req = new GTScanRequest(table.getInfo(), null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null); + IGTScanner scanner = table.scan(req); + int i = 0; + for (GTRecord r : scanner) { + Object[] v = r.getValues(); + switch (i) { + case 0: + assertTrue(((LongMutable) v[3]).get() == 20); + assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0); + break; + case 1: + assertTrue(((LongMutable) v[3]).get() == 30); + assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5); + break; + case 2: + assertTrue(((LongMutable) v[3]).get() == 40); + assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0); + break; + case 3: + assertTrue(((LongMutable) v[3]).get() == 10); + assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5); + break; + default: + fail(); + } + i++; + System.out.println(r); + } + scanner.close(); + System.out.println("Scanned Row Block Count: " + scanner.getScannedRowBlockCount()); + System.out.println("Scanned Row Count: " + scanner.getScannedRowCount()); + return scanner; + } + + static GTBuilder rebuild(GridTable table) throws IOException { + GTBuilder builder = table.rebuild(); + for (GTRecord rec : UnitTestSupport.mockupData(table.getInfo(), 10)) { + builder.write(rec); + } + builder.close(); + + System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount()); + System.out.println("Written Row Count: " + builder.getWrittenRowCount()); + return builder; + } + + static void rebuildViaAppend(GridTable table) throws IOException { + List data = UnitTestSupport.mockupData(table.getInfo(), 10); + GTBuilder builder; + int i = 0; + + builder = table.append(); + builder.write(data.get(i++)); + builder.write(data.get(i++)); + builder.write(data.get(i++)); + builder.write(data.get(i++)); + builder.close(); + System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount()); + System.out.println("Written Row Count: " + builder.getWrittenRowCount()); + + builder = table.append(); + builder.write(data.get(i++)); + builder.write(data.get(i++)); + builder.write(data.get(i++)); + builder.close(); + System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount()); + System.out.println("Written Row Count: " + builder.getWrittenRowCount()); + + builder = table.append(); + builder.write(data.get(i++)); + builder.write(data.get(i++)); + builder.close(); + System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount()); + System.out.println("Written Row Count: " + builder.getWrittenRowCount()); + + builder = table.append(); + builder.write(data.get(i++)); + builder.close(); + System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount()); + System.out.println("Written Row Count: " + builder.getWrittenRowCount()); + } + + private static ImmutableBitSet setOf(int... values) { + BitSet set = new BitSet(); + for (int i : values) + set.set(i); + return new ImmutableBitSet(set); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java new file mode 100644 index 0000000..f5c9645 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import static org.junit.Assert.*; +import it.uniroma3.mat.extendedset.intset.ConciseSet; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTInvertedIndex; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTRowBlock; +import org.apache.kylin.gridtable.UnitTestSupport; +import org.apache.kylin.metadata.filter.ColumnTupleFilter; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.ConstantTupleFilter; +import org.apache.kylin.metadata.filter.LogicalTupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.measure.LongMutable; +import org.apache.kylin.metadata.measure.serializer.StringSerializer; +import org.apache.kylin.metadata.model.DataType; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class SimpleInvertedIndexTest { + + GTInfo info; + GTInvertedIndex index; + ArrayList basicFilters = Lists.newArrayList(); + ArrayList basicResults = Lists.newArrayList(); + + public SimpleInvertedIndexTest() { + + info = UnitTestSupport.advancedInfo(); + TblColRef colA = info.colRef(0); + + // block i contains value "i", the last is NULL + index = new GTInvertedIndex(info); + GTRowBlock mockBlock = GTRowBlock.allocate(info); + GTRowBlock.Writer writer = mockBlock.getWriter(); + GTRecord record = new GTRecord(info); + for (int i = 0; i < 10; i++) { + record.setValues(i < 9 ? "" + i : null, "", "", new LongMutable(0), new BigDecimal(0)); + for (int j = 0; j < info.getRowBlockSize(); j++) { + writer.append(record); + } + writer.readyForFlush(); + index.add(mockBlock); + + writer.clearForNext(); + } + + basicFilters.add(compare(colA, FilterOperatorEnum.ISNULL)); + basicResults.add(set(9)); + + basicFilters.add(compare(colA, FilterOperatorEnum.ISNOTNULL)); + basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + + basicFilters.add(compare(colA, FilterOperatorEnum.EQ, 0)); + basicResults.add(set(0)); + + basicFilters.add(compare(colA, FilterOperatorEnum.NEQ, 0)); + basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + + basicFilters.add(compare(colA, FilterOperatorEnum.IN, 0, 5)); + basicResults.add(set(0, 5)); + + basicFilters.add(compare(colA, FilterOperatorEnum.NOTIN, 0, 5)); + basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + + basicFilters.add(compare(colA, FilterOperatorEnum.LT, 3)); + basicResults.add(set(0, 1, 2)); + + basicFilters.add(compare(colA, FilterOperatorEnum.LTE, 3)); + basicResults.add(set(0, 1, 2, 3)); + + basicFilters.add(compare(colA, FilterOperatorEnum.GT, 3)); + basicResults.add(set(4, 5, 6, 7, 8)); + + basicFilters.add(compare(colA, FilterOperatorEnum.GTE, 3)); + basicResults.add(set(3, 4, 5, 6, 7, 8)); + } + + @Test + public void testBasics() { + for (int i = 0; i < basicFilters.size(); i++) { + assertEquals(basicResults.get(i), index.filter(basicFilters.get(i))); + } + } + + @Test + public void testLogicalAnd() { + for (int i = 0; i < basicFilters.size(); i++) { + for (int j = 0; j < basicFilters.size(); j++) { + LogicalTupleFilter f = logical(FilterOperatorEnum.AND, basicFilters.get(i), basicFilters.get(j)); + ConciseSet r = basicResults.get(i).clone(); + r.retainAll(basicResults.get(j)); + assertEquals(r, index.filter(f)); + } + } + } + + @Test + public void testLogicalOr() { + for (int i = 0; i < basicFilters.size(); i++) { + for (int j = 0; j < basicFilters.size(); j++) { + LogicalTupleFilter f = logical(FilterOperatorEnum.OR, basicFilters.get(i), basicFilters.get(j)); + ConciseSet r = basicResults.get(i).clone(); + r.addAll(basicResults.get(j)); + assertEquals(r, index.filter(f)); + } + } + } + + @Test + public void testNotEvaluable() { + ConciseSet all = set(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompareTupleFilter notEvaluable = compare(info.colRef(1), FilterOperatorEnum.EQ, 0); + assertEquals(all, index.filter(notEvaluable)); + + LogicalTupleFilter or = logical(FilterOperatorEnum.OR, basicFilters.get(0), notEvaluable); + assertEquals(all, index.filter(or)); + + LogicalTupleFilter and = logical(FilterOperatorEnum.AND, basicFilters.get(0), notEvaluable); + assertEquals(basicResults.get(0), index.filter(and)); + } + + public static CompareTupleFilter compare(TblColRef col, TupleFilter.FilterOperatorEnum op, int... ids) { + CompareTupleFilter filter = new CompareTupleFilter(op); + filter.addChild(columnFilter(col)); + for (int i : ids) { + filter.addChild(constFilter(i)); + } + return filter; + } + + public static LogicalTupleFilter logical(TupleFilter.FilterOperatorEnum op, TupleFilter... filters) { + LogicalTupleFilter filter = new LogicalTupleFilter(op); + for (TupleFilter f : filters) + filter.addChild(f); + return filter; + } + + public static ColumnTupleFilter columnFilter(TblColRef col) { + return new ColumnTupleFilter(col); + } + + public static ConstantTupleFilter constFilter(int id) { + byte[] space = new byte[10]; + ByteBuffer buf = ByteBuffer.wrap(space); + StringSerializer stringSerializer = new StringSerializer(DataType.getInstance("string")); + stringSerializer.serialize("" + id, buf); + ByteArray data = new ByteArray(buf.array(), buf.arrayOffset(), buf.position()); + return new ConstantTupleFilter(data); + } + + public static ConciseSet set(int... ints) { + ConciseSet set = new ConciseSet(); + for (int i : ints) + set.add(i); + return set; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-job/pom.xml ---------------------------------------------------------------------- diff --git a/core-job/pom.xml b/core-job/pom.xml index b51c1cf..b619f43 100644 --- a/core-job/pom.xml +++ b/core-job/pom.xml @@ -41,8 +41,20 @@ ${project.parent.version} + + org.apache.curator + curator-framework + + + org.apache.kylin + kylin-core-common + test-jar + test + ${project.parent.version} + + junit junit test http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java new file mode 100644 index 0000000..6b814ef --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +public class BuildEngineFactory { + + private static IBatchCubingEngine defaultBatchEngine; + + public static IBatchCubingEngine defaultBatchEngine() { + if (defaultBatchEngine == null) { + KylinConfig conf = KylinConfig.getInstanceFromEnv(); + if (conf.isCubingInMem()) { + defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine2"); + } else { + defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine"); + } + } + return defaultBatchEngine; + } + + /** Build a new cube segment, typically its time range appends to the end of current cube. */ + public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { + return defaultBatchEngine().createBatchCubingJob(newSegment, submitter); + } + + /** Merge multiple small segments into a big one. */ + public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) { + return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java new file mode 100644 index 0000000..904f557 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine; + +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +public interface IBatchCubingEngine { + + /** Build a new cube segment, typically its time range appends to the end of current cube. */ + public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter); + + /** Merge multiple small segments into a big one. */ + public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter); + + public Class getSourceInterface(); + + public Class getStorageInterface(); +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java new file mode 100644 index 0000000..0359ce9 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/engine/IStreamingCubingEngine.java @@ -0,0 +1,8 @@ +package org.apache.kylin.engine; + +import org.apache.kylin.cube.CubeSegment; + +public interface IStreamingCubingEngine { + + public Runnable createStreamingCubingBuilder(CubeSegment seg); +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-job/src/main/java/org/apache/kylin/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java new file mode 100644 index 0000000..e7f5772 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonBackReference; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonManagedReference; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.constant.JobStepCmdTypeEnum; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.apache.kylin.job.engine.JobEngineConfig; + +@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) +public class JobInstance extends RootPersistentEntity implements Comparable { + + public static final String JOB_WORKING_DIR_PREFIX = "kylin-"; + + public static final String YARN_APP_ID = "yarn_application_id"; + public static final String YARN_APP_URL = "yarn_application_tracking_url"; + public static final String MR_JOB_ID = "mr_job_id"; + public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written"; + public static final String SOURCE_RECORDS_COUNT = "source_records_count"; + public static final String SOURCE_RECORDS_SIZE = "source_records_size"; + + public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) { + return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID(); + } + + public static String getJobIdentity(JobInstance jobInstance) { + return jobInstance.getRelatedCube() + "." + jobInstance.getUuid(); + } + + public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) { + return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory()); + } + + public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) { + if (jobUuid == null || jobUuid.equals("")) { + throw new IllegalArgumentException("jobUuid can't be null or empty"); + } + return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid; + } + + @JsonProperty("name") + private String name; + + @JsonProperty("type") + private CubeBuildTypeEnum type; // java implementation + @JsonProperty("duration") + private long duration; + @JsonProperty("related_cube") + private String relatedCube; + @JsonProperty("related_segment") + private String relatedSegment; + @JsonProperty("exec_start_time") + private long execStartTime; + @JsonProperty("exec_end_time") + private long execEndTime; + @JsonProperty("mr_waiting") + private long mrWaiting = 0; + @JsonManagedReference + @JsonProperty("steps") + private List steps; + @JsonProperty("submitter") + private String submitter; + @JsonProperty("job_status") + private JobStatusEnum status; + + public JobStep getRunningStep() { + for (JobStep step : this.getSteps()) { + if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) { + return step; + } + } + + return null; + } + + @JsonProperty("progress") + public double getProgress() { + int completedStepCount = 0; + for (JobStep step : this.getSteps()) { + if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) { + completedStepCount++; + } + } + + return 100.0 * completedStepCount / steps.size(); + } + + public JobStatusEnum getStatus() { + return this.status; + } + + public void setStatus(JobStatusEnum status) { + this.status = status; + } + +// @JsonProperty("job_status") +// public JobStatusEnum getStatus() { +// +// // JobStatusEnum finalJobStatus; +// int compositResult = 0; +// +// // if steps status are all NEW, then job status is NEW +// // if steps status are all FINISHED, then job status is FINISHED +// // if steps status are all PENDING, then job status is PENDING +// // if steps status are FINISHED and PENDING, the job status is PENDING +// // if one of steps status is RUNNING, then job status is RUNNING +// // if one of steps status is ERROR, then job status is ERROR +// // if one of steps status is KILLED, then job status is KILLED +// // default status is RUNNING +// +// System.out.println(this.getName()); +// +// for (JobStep step : this.getSteps()) { +// //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus()); +// compositResult = compositResult | step.getStatus().getCode(); +// } +// +// System.out.println(); +// +// if (compositResult == JobStatusEnum.FINISHED.getCode()) { +// return JobStatusEnum.FINISHED; +// } else if (compositResult == JobStatusEnum.NEW.getCode()) { +// return JobStatusEnum.NEW; +// } else if (compositResult == JobStatusEnum.PENDING.getCode()) { +// return JobStatusEnum.PENDING; +// } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) { +// return JobStatusEnum.PENDING; +// } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) { +// return JobStatusEnum.ERROR; +// } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) { +// return JobStatusEnum.DISCARDED; +// } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) { +// return JobStatusEnum.RUNNING; +// } +// +// return JobStatusEnum.RUNNING; +// } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public CubeBuildTypeEnum getType() { + return type; + } + + public void setType(CubeBuildTypeEnum type) { + this.type = type; + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + public String getRelatedCube() { + return relatedCube; + } + + public void setRelatedCube(String relatedCube) { + this.relatedCube = relatedCube; + } + + public String getRelatedSegment() { + return relatedSegment; + } + + public void setRelatedSegment(String relatedSegment) { + this.relatedSegment = relatedSegment; + } + + /** + * @return the execStartTime + */ + public long getExecStartTime() { + return execStartTime; + } + + /** + * @param execStartTime the execStartTime to set + */ + public void setExecStartTime(long execStartTime) { + this.execStartTime = execStartTime; + } + + /** + * @return the execEndTime + */ + public long getExecEndTime() { + return execEndTime; + } + + /** + * @param execEndTime the execEndTime to set + */ + public void setExecEndTime(long execEndTime) { + this.execEndTime = execEndTime; + } + + public long getMrWaiting() { + return this.mrWaiting; + } + + public void setMrWaiting(long mrWaiting) { + this.mrWaiting = mrWaiting; + } + + public List getSteps() { + if (steps == null) { + steps = Lists.newArrayList(); + } + return steps; + } + + public void clearSteps() { + getSteps().clear(); + } + + public void addSteps(Collection steps) { + this.getSteps().addAll(steps); + } + + public void addStep(JobStep step) { + getSteps().add(step); + } + + public void addStep(int index, JobStep step) { + getSteps().add(index, step); + } + + public JobStep findStep(String stepName) { + for (JobStep step : getSteps()) { + if (stepName.equals(step.getName())) { + return step; + } + } + return null; + } + + + public String getSubmitter() { + return submitter; + } + + public void setSubmitter(String submitter) { + this.submitter = submitter; + } + + + + + @JsonIgnoreProperties(ignoreUnknown = true) + public static class JobStep implements Comparable { + + @JsonBackReference + private JobInstance jobInstance; + + @JsonProperty("id") + private String id; + + @JsonProperty("name") + private String name; + + @JsonProperty("sequence_id") + private int sequenceID; + + @JsonProperty("exec_cmd") + private String execCmd; + + @JsonProperty("interrupt_cmd") + private String InterruptCmd; + + @JsonProperty("exec_start_time") + private long execStartTime; + @JsonProperty("exec_end_time") + private long execEndTime; + @JsonProperty("exec_wait_time") + private long execWaitTime; + + @JsonProperty("step_status") + private JobStepStatusEnum status; + + @JsonProperty("cmd_type") + private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP; + + @JsonProperty("info") + private ConcurrentHashMap info = new ConcurrentHashMap(); + + @JsonProperty("run_async") + private boolean runAsync = false; + + private ConcurrentHashMap getInfo() { + return info; + } + + public void putInfo(String key, String value) { + getInfo().put(key, value); + } + + public String getInfo(String key) { + return getInfo().get(key); + } + + public void clearInfo() { + getInfo().clear(); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getSequenceID() { + return sequenceID; + } + + public void setSequenceID(int sequenceID) { + this.sequenceID = sequenceID; + } + + public String getExecCmd() { + return execCmd; + } + + public void setExecCmd(String execCmd) { + this.execCmd = execCmd; + } + + public JobStepStatusEnum getStatus() { + return status; + } + + public void setStatus(JobStepStatusEnum status) { + this.status = status; + } + + + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * @return the execStartTime + */ + public long getExecStartTime() { + return execStartTime; + } + + /** + * @param execStartTime the execStartTime to set + */ + public void setExecStartTime(long execStartTime) { + this.execStartTime = execStartTime; + } + + /** + * @return the execEndTime + */ + public long getExecEndTime() { + return execEndTime; + } + + /** + * @param execEndTime the execEndTime to set + */ + public void setExecEndTime(long execEndTime) { + this.execEndTime = execEndTime; + } + + public long getExecWaitTime() { + return execWaitTime; + } + + public void setExecWaitTime(long execWaitTime) { + this.execWaitTime = execWaitTime; + } + + public String getInterruptCmd() { + return InterruptCmd; + } + + public void setInterruptCmd(String interruptCmd) { + InterruptCmd = interruptCmd; + } + + public JobStepCmdTypeEnum getCmdType() { + return cmdType; + } + + public void setCmdType(JobStepCmdTypeEnum cmdType) { + this.cmdType = cmdType; + } + + /** + * @return the runAsync + */ + public boolean isRunAsync() { + return runAsync; + } + + /** + * @param runAsync the runAsync to set + */ + public void setRunAsync(boolean runAsync) { + this.runAsync = runAsync; + } + + /** + * @return the jobInstance + */ + public JobInstance getJobInstance() { + return jobInstance; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + sequenceID; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + JobStep other = (JobStep) obj; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + if (sequenceID != other.sequenceID) + return false; + return true; + } + + @Override + public int compareTo(JobStep o) { + if (this.sequenceID < o.sequenceID) { + return -1; + } else if (this.sequenceID > o.sequenceID) { + return 1; + } else { + return 0; + } + } + } + + @Override + public int compareTo(JobInstance o) { + return o.lastModifiedthis.lastModified?1:0; + } + +}