Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E535717313 for ; Wed, 24 Jun 2015 10:24:47 +0000 (UTC) Received: (qmail 24864 invoked by uid 500); 24 Jun 2015 10:24:47 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 24836 invoked by uid 500); 24 Jun 2015 10:24:47 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 24827 invoked by uid 99); 24 Jun 2015 10:24:47 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 10:24:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 6250718195D for ; Wed, 24 Jun 2015 10:24:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id mtbkAYC3mfm2 for ; Wed, 24 Jun 2015 10:24:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 1389421F26 for ; Wed, 24 Jun 2015 10:24:44 +0000 (UTC) Received: (qmail 24721 invoked by uid 99); 24 Jun 2015 10:24:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 10:24:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE1E4DFF1B; Wed, 24 Jun 2015 10:24:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.incubator.apache.org Date: Wed, 24 Jun 2015 10:24:43 -0000 Message-Id: <494491e5d74c4c9a870e1e7c316d22ac@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-kylin git commit: KYLIN-803 half way Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-803 [created] 3c0677c5b KYLIN-803 half way Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ba9dfed2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ba9dfed2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ba9dfed2 Branch: refs/heads/KYLIN-803 Commit: ba9dfed28249dcf1014d885356817ef04c51ed51 Parents: fae3d28 Author: Yang Li Authored: Wed Jun 24 10:39:23 2015 +0800 Committer: Li, Yang Committed: Wed Jun 24 16:21:43 2015 +0800 ---------------------------------------------------------------------- .../kylin/cube/cuboid/CuboidScheduler.java | 14 +- .../inmemcubing/AbstractInMemCubeBuilder.java | 64 +++++- .../kylin/job/inmemcubing/InMemCubeBuilder.java | 206 ++----------------- .../kylin/storage/gridtable/GridTable.java | 10 +- 4 files changed, 96 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ba9dfed2/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java ---------------------------------------------------------------------- diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java index 07be092..bebfd08 100644 --- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java +++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java @@ -19,8 +19,6 @@ package org.apache.kylin.cube.cuboid; /** - * @author George Song (ysong1) - * */ import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -42,6 +40,18 @@ public class CuboidScheduler { this.max = (long) Math.pow(2, size) - 1; this.cache = new ConcurrentHashMap>(); } + + public int getCuboidCount() { + return getCuboidCount(Cuboid.getBaseCuboidId(cubeDef)); + } + + private int getCuboidCount(long cuboidId) { + int r = 1; + for (Long child : getSpanningCuboid(cuboidId)) { + r += getCuboidCount(child); + } + return r; + } public List getSpanningCuboid(long cuboid) { if (cuboid > max || cuboid < 0) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ba9dfed2/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java index 034c4cd..cf7c356 100644 --- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java @@ -19,17 +19,26 @@ package org.apache.kylin.job.inmemcubing; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.gridtable.GTRecord; +import org.apache.kylin.storage.gridtable.GTScanRequest; +import org.apache.kylin.storage.gridtable.GridTable; +import org.apache.kylin.storage.gridtable.IGTScanner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An interface alike abstract class. Hold common tunable parameters and nothing more. */ abstract public class AbstractInMemCubeBuilder { + private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder .class); + final protected CubeDesc cubeDesc; final protected Map> dictionaryMap; @@ -72,5 +81,58 @@ abstract public class AbstractInMemCubeBuilder { }; } - abstract public void build(BlockingQueue> input, ICuboidWriter gtRecordWriter) throws IOException; + public void build(BlockingQueue> input, ICuboidWriter output) throws IOException { + TreeMap result = build(input); + for (CuboidResult cuboidResult : result.values()) { + outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output); + cuboidResult.table.close(); + } + } + + private void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException { + long startTime = System.currentTimeMillis(); + GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null); + IGTScanner scanner = gridTable.scan(req); + for (GTRecord record : scanner) { + output.write(cuboidId, record); + } + scanner.close(); + logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); + } + + public TreeMap build(BlockingQueue> input) throws IOException { + final TreeMap result = new TreeMap(); + ICuboidCollector collector = new ICuboidCollector() { + @Override + public void collect(CuboidResult cuboidResult) { + result.put(cuboidResult.cuboidId, cuboidResult); + } + }; + build(input, collector); + return result; + } + + abstract public void build(BlockingQueue> input, ICuboidCollector collector) throws IOException; + + public static interface ICuboidCollector { + public void collect(CuboidResult result); + } + + public static class CuboidResult { + public long cuboidId; + public GridTable table; + public int nRows; + public long timeSpent; + public int aggrCacheMB; + + public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) { + this.cuboidId = cuboidId; + this.table = table; + this.nRows = nRows; + this.timeSpent = timeSpent; + this.aggrCacheMB = aggrCacheMB; + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ba9dfed2/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java index 8d0b0fb..996e747 100644 --- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java @@ -16,21 +16,15 @@ */ package org.apache.kylin.job.inmemcubing; -import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.io.DoubleWritable; @@ -57,7 +51,6 @@ import org.apache.kylin.storage.gridtable.GTRecord; import org.apache.kylin.storage.gridtable.GTScanRequest; import org.apache.kylin.storage.gridtable.GridTable; import org.apache.kylin.storage.gridtable.IGTScanner; -import org.apache.kylin.storage.gridtable.IGTStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +66,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class); private static final LongWritable ONE = new LongWritable(1l); - private final long baseCuboidId; private final CuboidScheduler cuboidScheduler; + private final long baseCuboidId; + private final int totalCuboidCount; private final CubeJoinedFlatTableDesc intermediateTableDesc; private final MeasureCodec measureCodec; private final String[] metricsAggrFuncs; @@ -88,15 +82,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private TreeSet taskPending; private AtomicInteger taskCuboidCompleted = new AtomicInteger(0); - private OutputThread outputThread; - private int outputCuboidExpected; private CuboidResult baseResult; private Object[] totalSumForSanityCheck; + private ICuboidCollector resultCollector; public InMemCubeBuilder(CubeDesc cubeDesc, Map> dictionaryMap) { super(cubeDesc, dictionaryMap); this.cuboidScheduler = new CuboidScheduler(cubeDesc); this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + this.totalCuboidCount = cuboidScheduler.getCuboidCount(); this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); this.measureCodec = new MeasureCodec(cubeDesc.getMeasures()); @@ -153,7 +147,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } @Override - public void build(BlockingQueue> input, ICuboidWriter output) throws IOException { + public void build(BlockingQueue> input, ICuboidCollector collector) throws IOException { long startTime = System.currentTimeMillis(); logger.info("In Mem Cube Build start, " + cubeDesc.getName()); @@ -163,14 +157,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { taskThreads = prepareTaskThreads(); taskThreadExceptions = new Throwable[taskThreadCount]; - // output goes in a separate thread to leverage any async-ness - outputThread = new OutputThread(output); - outputCuboidExpected = outputThread.getOutputCuboidExpected(); - // build base cuboid + resultCollector = collector; totalSumForSanityCheck = null; baseResult = createBaseCuboid(input); - taskCuboidCompleted.incrementAndGet(); if (baseResult.nRows == 0) return; @@ -180,11 +170,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { // kick off N-D cuboid tasks and output addChildTasks(baseResult); start(taskThreads); - start(outputThread); // wait complete join(taskThreads); - join(outputThread); long endTime = System.currentTimeMillis(); logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms"); @@ -194,7 +182,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { public void abort() { interrupt(taskThreads); - interrupt(outputThread); } private void start(Thread... threads) { @@ -223,9 +210,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { if (t != null) errors.add(t); } - if (outputThread.getException() != null) { - errors.add(outputThread.getException()); - } if (errors.isEmpty()) { return; } else if (errors.size() == 1) { @@ -250,7 +234,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } public boolean isAllCuboidDone() { - return taskCuboidCompleted.get() == outputCuboidExpected; + return taskCuboidCompleted.get() == totalCuboidCount; } private class CuboidTaskThread extends Thread { @@ -280,8 +264,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId); addChildTasks(newCuboid); - task.parent.markOneSpanningDone(); - taskCuboidCompleted.incrementAndGet(); if (isAllCuboidDone()) { for (Thread t : taskThreads) { @@ -387,8 +369,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { if (aggrCacheMB <= 0) { aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB); } + CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB); - outputThread.addOutput(result); + taskCuboidCompleted.incrementAndGet(); + + resultCollector.collect(result); return result; } @@ -397,7 +382,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() { @Override public int freeUp(int mb) { - return 0; // cannot free up + return 0; // cannot free up on demand } @Override @@ -493,17 +478,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } } - private void closeStore(GridTable gt) { - IGTStore store = gt.getStore(); - if (store instanceof Closeable) { - try { - ((Closeable) store).close(); - } catch (IOException e) { - logger.warn("Close " + store + " exception", e); - } - } - } - // =========================================================================== private static class CuboidTask implements Comparable { @@ -522,162 +496,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } } - private class CuboidResult { - long cuboidId; - GridTable table; - int nRows; - @SuppressWarnings("unused") - long timeSpent; - int aggrCacheMB; - boolean outputDone; - int spanningDone; - - public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) { - this.cuboidId = cuboidId; - this.table = table; - this.nRows = nRows; - this.timeSpent = timeSpent; - this.aggrCacheMB = aggrCacheMB; - } - - synchronized void markOutputDone() { - outputDone = true; - closeIfAllDone(); - } - - synchronized void markOneSpanningDone() { - spanningDone++; - closeIfAllDone(); - } - - private void closeIfAllDone() { - if (outputDone && spanningDone == cuboidScheduler.getSpanningCuboid(cuboidId).size()) { - closeStore(table); - } - } - } - - // ============================================================================ - - private class OutputThread extends Thread { - private ICuboidWriter output; - private SortedMap outputSequence; // synchronized sorted map - private LinkedBlockingDeque outputPending; - private int outputCount; - private int outputCuboidExpected; - private Throwable outputThreadException; - - OutputThread(ICuboidWriter output) { - super("CuboidOutput"); - this.output = output; - this.outputSequence = prepareOutputSequence(); - this.outputPending = new LinkedBlockingDeque(); - this.outputCount = 0; - this.outputCuboidExpected = outputSequence.size(); - - if (outputOrderRequired == false) - outputSequence = null; - } - - public int getOutputCuboidExpected() { - return outputCuboidExpected; - } - - private SortedMap prepareOutputSequence() { - TreeMap result = new TreeMap(); - prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result); - return Collections.synchronizedSortedMap(result); - } - - private void prepareOutputPendingRecursive(Long cuboidId, TreeMap result) { - result.put(cuboidId, cuboidId); - for (Long child : cuboidScheduler.getSpanningCuboid(cuboidId)) { - prepareOutputPendingRecursive(child, result); - } - } - - public void addOutput(CuboidResult result) { - // if output is NOT ordered - if (outputSequence == null) { - outputPending.addLast(result); - } - // if output is ordered - else { - Long cuboidId = outputSequence.get(result.cuboidId); - synchronized (cuboidId) { - outputPending.addFirst(result); - cuboidId.notify(); - } - } - } - - private CuboidResult nextOutput() throws InterruptedException { - CuboidResult result = null; - - // if output is NOT ordered - if (outputSequence == null) { - while (result == null && taskHasNoException()) { - result = outputPending.pollFirst(60, TimeUnit.SECONDS); - } - } - // if output is ordered - else { - Long nextCuboidId = outputSequence.get(outputSequence.firstKey()); - synchronized (nextCuboidId) { - while ((result = findPendingOutput(nextCuboidId)) == null && taskHasNoException()) { - nextCuboidId.wait(60000); - } - } - outputSequence.remove(result.cuboidId); - } - - return result; - } - - @Override - public void run() { - try { - while (outputCount < outputCuboidExpected) { - CuboidResult result = nextOutput(); - - // if task error occurs - if (result == null || result.table == null) - break; - - outputCuboid(result.cuboidId, result.table); - outputCount++; - result.markOutputDone(); - } - } catch (Throwable ex) { - logger.error("output thread exception", ex); - outputThreadException = ex; - } - } - - private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException { - long startTime = System.currentTimeMillis(); - GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null); - IGTScanner scanner = gridTable.scan(req); - for (GTRecord record : scanner) { - output.write(cuboidId, record); - } - scanner.close(); - logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); - } - - private CuboidResult findPendingOutput(Long cuboidId) { - for (CuboidResult r : outputPending) { - if (r.cuboidId == cuboidId) - return r; - } - return null; - } - - public Throwable getException() { - return outputThreadException; - } - } - // ============================================================================ private class InputConverter implements IGTScanner { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ba9dfed2/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java index 20b543a..092227b 100644 --- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java +++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java @@ -1,8 +1,9 @@ package org.apache.kylin.storage.gridtable; +import java.io.Closeable; import java.io.IOException; -public class GridTable { +public class GridTable implements Closeable { final GTInfo info; final IGTStore store; @@ -50,4 +51,11 @@ public class GridTable { public IGTStore getStore() { return store; } + + @Override + public void close() throws IOException { + if (store instanceof Closeable) { + ((Closeable) store).close(); + } + } }