Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C31E218E75 for ; Thu, 23 Jul 2015 23:20:46 +0000 (UTC) Received: (qmail 26535 invoked by uid 500); 23 Jul 2015 23:20:37 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 26499 invoked by uid 500); 23 Jul 2015 23:20:37 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 26450 invoked by uid 99); 23 Jul 2015 23:20:37 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jul 2015 23:20:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 728B3C0B8F for ; Thu, 23 Jul 2015 23:20:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 9OUhVdIKuRJm for ; Thu, 23 Jul 2015 23:20:17 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 829DE24DBE for ; Thu, 23 Jul 2015 23:20:10 +0000 (UTC) Received: (qmail 25716 invoked by uid 99); 23 Jul 2015 23:20:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jul 2015 23:20:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9793E6849; Thu, 23 Jul 2015 23:20:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.incubator.apache.org Date: Thu, 23 Jul 2015 23:20:17 -0000 Message-Id: In-Reply-To: <9271c088b1be42f994d443b5b1ff7d4c@git.apache.org> References: <9271c088b1be42f994d443b5b1ff7d4c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project. http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java deleted file mode 100644 index 2cde011..0000000 --- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java +++ /dev/null @@ -1,661 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.inmemcubing; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.MemoryBudgetController; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.metadata.measure.DoubleMutable; -import org.apache.kylin.metadata.measure.LongMutable; -import org.apache.kylin.metadata.measure.MeasureCodec; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.cube.CubeGridTable; -import org.apache.kylin.storage.gridtable.GTAggregateScanner; -import org.apache.kylin.storage.gridtable.GTBuilder; -import org.apache.kylin.storage.gridtable.GTInfo; -import org.apache.kylin.storage.gridtable.GTRecord; -import org.apache.kylin.storage.gridtable.GTScanRequest; -import org.apache.kylin.storage.gridtable.GridTable; -import org.apache.kylin.storage.gridtable.IGTScanner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits. - * Assumes base cuboid fits in memory or otherwise OOM exception will occur. - */ -public class InMemCubeBuilder extends AbstractInMemCubeBuilder { - - private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class); - private static final LongMutable ONE = new LongMutable(1l); - - private final CuboidScheduler cuboidScheduler; - private final long baseCuboidId; - private final int totalCuboidCount; - private final CubeJoinedFlatTableDesc intermediateTableDesc; - private final MeasureCodec measureCodec; - private final String[] metricsAggrFuncs; - private final int[] hbaseMeasureRefIndex; - private final MeasureDesc[] measureDescs; - private final int measureCount; - - private MemoryBudgetController memBudget; - private Thread[] taskThreads; - private Throwable[] taskThreadExceptions; - private LinkedBlockingQueue taskPending; - private AtomicInteger taskCuboidCompleted = new AtomicInteger(0); - - private CuboidResult baseResult; - private Object[] totalSumForSanityCheck; - private ICuboidCollector resultCollector; - - public InMemCubeBuilder(CubeDesc cubeDesc, Map> dictionaryMap) { - super(cubeDesc, dictionaryMap); - this.cuboidScheduler = new CuboidScheduler(cubeDesc); - this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - this.totalCuboidCount = cuboidScheduler.getCuboidCount(); - this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); - this.measureCodec = new MeasureCodec(cubeDesc.getMeasures()); - - Map measureIndexMap = Maps.newHashMap(); - List metricsAggrFuncsList = Lists.newArrayList(); - measureCount = cubeDesc.getMeasures().size(); - - List measureDescsList = Lists.newArrayList(); - hbaseMeasureRefIndex = new int[measureCount]; - int measureRef = 0; - for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) { - for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) { - for (MeasureDesc measure : hbaseColDesc.getMeasures()) { - for (int j = 0; j < measureCount; j++) { - if (cubeDesc.getMeasures().get(j).equals(measure)) { - measureDescsList.add(measure); - hbaseMeasureRefIndex[measureRef] = j; - break; - } - } - measureRef++; - } - } - } - - for (int i = 0; i < measureCount; i++) { - MeasureDesc measureDesc = measureDescsList.get(i); - metricsAggrFuncsList.add(measureDesc.getFunction().getExpression()); - measureIndexMap.put(measureDesc.getName(), i); - } - this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]); - this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); - } - - private GridTable newGridTableByCuboidID(long cuboidID) throws IOException { - GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap); - - // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest. - // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget); - // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET); - ConcurrentDiskStore store = new ConcurrentDiskStore(info); - - GridTable gridTable = new GridTable(info, store); - return gridTable; - } - - private Pair getDimensionAndMetricColumnBitSet(long cuboidId) { - BitSet bitSet = BitSet.valueOf(new long[] { cuboidId }); - BitSet dimension = new BitSet(); - dimension.set(0, bitSet.cardinality()); - BitSet metrics = new BitSet(); - metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount); - return new Pair(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics)); - } - - @Override - public void build(BlockingQueue> input, ICuboidWriter output) throws IOException { - ConcurrentNavigableMap result = build(input); - for (CuboidResult cuboidResult : result.values()) { - outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output); - cuboidResult.table.close(); - } - } - - ConcurrentNavigableMap build(BlockingQueue> input) throws IOException { - final ConcurrentNavigableMap result = new ConcurrentSkipListMap(); - build(input, new ICuboidCollector() { - @Override - public void collect(CuboidResult cuboidResult) { - result.put(cuboidResult.cuboidId, cuboidResult); - } - }); - return result; - } - - interface ICuboidCollector { - void collect(CuboidResult result); - } - - static class CuboidResult { - public long cuboidId; - public GridTable table; - public int nRows; - public long timeSpent; - public int aggrCacheMB; - - public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) { - this.cuboidId = cuboidId; - this.table = table; - this.nRows = nRows; - this.timeSpent = timeSpent; - this.aggrCacheMB = aggrCacheMB; - } - } - - private void build(BlockingQueue> input, ICuboidCollector collector) throws IOException { - long startTime = System.currentTimeMillis(); - logger.info("In Mem Cube Build start, " + cubeDesc.getName()); - - // multiple threads to compute cuboid in parallel - taskPending = new LinkedBlockingQueue<>(); - taskCuboidCompleted.set(0); - taskThreads = prepareTaskThreads(); - taskThreadExceptions = new Throwable[taskThreadCount]; - - // build base cuboid - resultCollector = collector; - totalSumForSanityCheck = null; - baseResult = createBaseCuboid(input); - if (baseResult.nRows == 0) - return; - - // plan memory budget - makeMemoryBudget(); - - // kick off N-D cuboid tasks and output - addChildTasks(baseResult); - start(taskThreads); - - // wait complete - join(taskThreads); - - long endTime = System.currentTimeMillis(); - logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms"); - - throwExceptionIfAny(); - } - - public void abort() { - interrupt(taskThreads); - } - - private void start(Thread... threads) { - for (Thread t : threads) - t.start(); - } - - private void interrupt(Thread... threads) { - for (Thread t : threads) - t.interrupt(); - } - - private void join(Thread... threads) throws IOException { - try { - for (Thread t : threads) - t.join(); - } catch (InterruptedException e) { - throw new IOException("interrupted while waiting task and output complete", e); - } - } - - private void throwExceptionIfAny() throws IOException { - ArrayList errors = new ArrayList(); - for (int i = 0; i < taskThreadCount; i++) { - Throwable t = taskThreadExceptions[i]; - if (t != null) - errors.add(t); - } - if (errors.isEmpty()) { - return; - } else if (errors.size() == 1) { - Throwable t = errors.get(0); - if (t instanceof IOException) - throw (IOException) t; - else - throw new IOException(t); - } else { - for (Throwable t : errors) - logger.error("Exception during in-mem cube build", t); - throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0)); - } - } - - private Thread[] prepareTaskThreads() { - Thread[] result = new Thread[taskThreadCount]; - for (int i = 0; i < taskThreadCount; i++) { - result[i] = new CuboidTaskThread(i); - } - return result; - } - - public boolean isAllCuboidDone() { - return taskCuboidCompleted.get() == totalCuboidCount; - } - - private class CuboidTaskThread extends Thread { - private int id; - - CuboidTaskThread(int id) { - super("CuboidTask-" + id); - this.id = id; - } - - @Override - public void run() { - try { - while (!isAllCuboidDone()) { - CuboidTask task = null; - while (task == null && taskHasNoException()) { - task = taskPending.poll(15, TimeUnit.SECONDS); - } - // if task error occurs - if (task == null) - break; - - CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId); - addChildTasks(newCuboid); - - if (isAllCuboidDone()) { - for (Thread t : taskThreads) { - if (t != Thread.currentThread()) - t.interrupt(); - } - } - } - } catch (Throwable ex) { - if (!isAllCuboidDone()) { - logger.error("task thread exception", ex); - taskThreadExceptions[id] = ex; - } - } - } - } - - private boolean taskHasNoException() { - for (int i = 0; i < taskThreadExceptions.length; i++) - if (taskThreadExceptions[i] != null) - return false; - return true; - } - - private void addChildTasks(CuboidResult parent) { - List children = cuboidScheduler.getSpanningCuboid(parent.cuboidId); - for (Long child : children) { - taskPending.add(new CuboidTask(parent, child)); - } - } - - private int getSystemAvailMB() { - Runtime.getRuntime().gc(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("", e); - } - return MemoryBudgetController.getSystemAvailMB(); - } - - private void makeMemoryBudget() { - int systemAvailMB = getSystemAvailMB(); - logger.info("System avail " + systemAvailMB + " MB"); - int reserve = Math.max(reserveMemoryMB, baseResult.aggrCacheMB / 3); - logger.info("Reserve " + reserve + " MB for system basics"); - - int budget = systemAvailMB - reserve; - if (budget < baseResult.aggrCacheMB) { - // make sure we have base aggr cache as minimal - budget = baseResult.aggrCacheMB; - logger.warn("!!! System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx"); - } - - logger.info("Memory Budget is " + budget + " MB"); - memBudget = new MemoryBudgetController(budget); - } - - private CuboidResult createBaseCuboid(BlockingQueue> input) throws IOException { - GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId); - GTBuilder baseBuilder = baseCuboid.rebuild(); - IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input); - - int mbBefore = getSystemAvailMB(); - int mbAfter = 0; - - Pair dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId); - GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); - GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req); - - long startTime = System.currentTimeMillis(); - logger.info("Calculating cuboid " + baseCuboidId); - - int count = 0; - for (GTRecord r : aggregationScanner) { - if (mbAfter == 0) { - mbAfter = getSystemAvailMB(); - } - baseBuilder.write(r); - count++; - } - aggregationScanner.close(); - baseBuilder.close(); - - long timeSpent = System.currentTimeMillis() - startTime; - logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms"); - - int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter; - int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB); - int mbBaseAggrCache = Math.max((int) (mbBaseAggrCacheOnHeap * 1.1), mbEstimateBaseAggrCache); - mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be 10 MB at least - logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)"); - - return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache); - } - - private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) { - if (aggrCacheMB <= 0) { - aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB); - } - - CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB); - taskCuboidCompleted.incrementAndGet(); - - resultCollector.collect(result); - return result; - } - - private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException { - final String consumerName = "AggrCache@Cuboid " + cuboidId; - MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() { - @Override - public int freeUp(int mb) { - return 0; // cannot free up on demand - } - - @Override - public String toString() { - return consumerName; - } - }; - - // reserve memory for aggregation cache, can't be larger than the parent - memBudget.reserveInsist(consumer, parent.aggrCacheMB); - try { - return aggregateCuboid(parent, cuboidId); - } finally { - memBudget.reserve(consumer, 0); - } - } - - private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException { - Pair columnBitSets = getDimensionAndMetricColumnBitSet(parent.cuboidId); - ImmutableBitSet parentDimensions = columnBitSets.getFirst(); - ImmutableBitSet measureColumns = columnBitSets.getSecond(); - ImmutableBitSet childDimensions = parentDimensions; - - long mask = Long.highestOneBit(parent.cuboidId); - long childCuboidId = cuboidId; - long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parent.cuboidId); - int index = 0; - for (int i = 0; i < parentCuboidIdActualLength; i++) { - if ((mask & parent.cuboidId) > 0) { - if ((mask & childCuboidId) == 0) { - // this dim will be aggregated - childDimensions = childDimensions.set(index, false); - } - index++; - } - mask = mask >> 1; - } - - return scanAndAggregateGridTable(parent.table, cuboidId, childDimensions, measureColumns); - } - - private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { - long startTime = System.currentTimeMillis(); - logger.info("Calculating cuboid " + cuboidId); - - GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null); - GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req); - GridTable newGridTable = newGridTableByCuboidID(cuboidId); - GTBuilder builder = newGridTable.rebuild(); - - ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns); - - GTRecord newRecord = new GTRecord(newGridTable.getInfo()); - int count = 0; - try { - for (GTRecord record : scanner) { - count++; - for (int i = 0; i < allNeededColumns.trueBitCount(); i++) { - int c = allNeededColumns.trueBitAt(i); - newRecord.set(i, record.get(c)); - } - builder.write(newRecord); - } - - // disable sanity check for performance - sanityCheck(scanner.getTotalSumForSanityCheck()); - } finally { - scanner.close(); - builder.close(); - } - - long timeSpent = System.currentTimeMillis() - startTime; - logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + timeSpent + "ms"); - - return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0); - } - - //@SuppressWarnings("unused") - private void sanityCheck(Object[] totalSum) { - // double sum introduces error and causes result not exactly equal - for (int i = 0; i < totalSum.length; i++) { - if (totalSum[i] instanceof DoubleMutable) { - totalSum[i] = Math.round(((DoubleMutable) totalSum[i]).get()); - } - } - - if (totalSumForSanityCheck == null) { - totalSumForSanityCheck = totalSum; - return; - } - if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) { - throw new IllegalStateException(); - } - } - - // =========================================================================== - - private static class CuboidTask implements Comparable { - final CuboidResult parent; - final long childCuboidId; - - CuboidTask(CuboidResult parent, long childCuboidId) { - this.parent = parent; - this.childCuboidId = childCuboidId; - } - - @Override - public int compareTo(CuboidTask o) { - long comp = this.childCuboidId - o.childCuboidId; - return comp < 0 ? -1 : (comp > 0 ? 1 : 0); - } - } - - // ============================================================================ - - private class InputConverter implements IGTScanner { - GTInfo info; - GTRecord record; - BlockingQueue> input; - - public InputConverter(GTInfo info, BlockingQueue> input) { - this.info = info; - this.input = input; - this.record = new GTRecord(info); - } - - @Override - public Iterator iterator() { - return new Iterator() { - - List currentObject = null; - - @Override - public boolean hasNext() { - try { - currentObject = input.take(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return currentObject != null && currentObject.size() > 0; - } - - @Override - public GTRecord next() { - if (currentObject.size() == 0) - throw new IllegalStateException(); - - buildGTRecord(currentObject, record); - return record; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public void close() throws IOException { - } - - @Override - public GTInfo getInfo() { - return info; - } - - @Override - public int getScannedRowCount() { - return 0; - } - - @Override - public int getScannedRowBlockCount() { - return 0; - } - - private void buildGTRecord(List row, GTRecord record) { - Object[] dimensions = buildKey(row); - Object[] metricsValues = buildValue(row); - Object[] recordValues = new Object[dimensions.length + metricsValues.length]; - System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length); - System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length); - record.setValues(recordValues); - } - - private Object[] buildKey(List row) { - int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length; - Object[] key = new Object[keySize]; - - for (int i = 0; i < keySize; i++) { - key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]); - } - - return key; - } - - private Object[] buildValue(List row) { - - Object[] values = new Object[measureCount]; - MeasureDesc measureDesc = null; - - for (int position = 0; position < hbaseMeasureRefIndex.length; position++) { - int i = hbaseMeasureRefIndex[position]; - measureDesc = measureDescs[i]; - - Object value = null; - int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i]; - FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction(); - if (function.isCount() || function.isHolisticCountDistinct()) { - // note for holistic count distinct, this value will be ignored - value = ONE; - } else if (flatTableIdx == null) { - value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue()); - } else if (flatTableIdx.length == 1) { - value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0]))); - } else { - - byte[] result = null; - for (int x = 0; x < flatTableIdx.length; x++) { - byte[] split = toBytes(row.get(flatTableIdx[x])); - if (result == null) { - result = Arrays.copyOf(split, split.length); - } else { - byte[] newResult = new byte[result.length + split.length]; - System.arraycopy(result, 0, newResult, 0, result.length); - System.arraycopy(split, 0, newResult, result.length, split.length); - result = newResult; - } - } - value = measureCodec.getSerializer(i).valueOf(result); - } - values[position] = value; - } - return values; - } - - private byte[] toBytes(String v) { - return v == null ? null : Bytes.toBytes(v); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java deleted file mode 100644 index badb14f..0000000 --- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java +++ /dev/null @@ -1,679 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.inmemcubing; - -import static org.apache.kylin.common.util.MemoryBudgetController.*; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.StandardOpenOption; -import java.util.NoSuchElementException; - -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.MemoryBudgetController; -import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer; -import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException; -import org.apache.kylin.storage.gridtable.GTInfo; -import org.apache.kylin.storage.gridtable.GTRecord; -import org.apache.kylin.storage.gridtable.GTRowBlock; -import org.apache.kylin.storage.gridtable.GTScanRequest; -import org.apache.kylin.storage.gridtable.IGTStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MemDiskStore implements IGTStore, Closeable { - - private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class); - private static final boolean debug = true; - - private static final int STREAM_BUFFER_SIZE = 8192; - private static final int MEM_CHUNK_SIZE_MB = 5; - - private final GTInfo info; - private final Object lock; // all public methods that read/write object states are synchronized on this lock - private final MemPart memPart; - private final DiskPart diskPart; - private final boolean delOnClose; - - private Writer ongoingWriter; - - public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException { - this(info, budgetCtrl, File.createTempFile("MemDiskStore", ""), true); - } - - public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException { - this(info, budgetCtrl, diskFile, false); - } - - private MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException { - this.info = info; - this.lock = this; - this.memPart = new MemPart(budgetCtrl); - this.diskPart = new DiskPart(diskFile); - this.delOnClose = delOnClose; - - // in case user forget to call close() - if (delOnClose) - diskFile.deleteOnExit(); - } - - @Override - public GTInfo getInfo() { - return info; - } - - @Override - public IGTStoreWriter rebuild(int shard) throws IOException { - return newWriter(0); - } - - @Override - public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException { - return newWriter(length()); - } - - private Writer newWriter(long startOffset) throws IOException { - synchronized (lock) { - if (ongoingWriter != null) - throw new IllegalStateException(); - - ongoingWriter = new Writer(startOffset); - return ongoingWriter; - } - } - - @Override - public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException { - synchronized (lock) { - return new Reader(); - } - } - - @Override - public void close() throws IOException { - // synchronized inside the parts close() - memPart.close(); - diskPart.close(); - } - - public long length() { - synchronized (lock) { - return Math.max(memPart.tailOffset(), diskPart.tailOffset); - } - } - - @Override - public String toString() { - return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName()); - } - - private class Reader implements IGTStoreScanner { - - final DataInputStream din; - long readOffset = 0; - long memRead = 0; - long diskRead = 0; - int nReadCalls = 0; - - GTRowBlock block = GTRowBlock.allocate(info); - GTRowBlock next = null; - - Reader() throws IOException { - diskPart.openRead(); - if (debug) - logger.debug(MemDiskStore.this + " read start @ " + readOffset); - - InputStream in = new InputStream() { - byte[] tmp = new byte[1]; - MemChunk memChunk; - - @Override - public int read() throws IOException { - int n = read(tmp, 0, 1); - if (n <= 0) - return -1; - else - return (int) tmp[0]; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - synchronized (lock) { - nReadCalls++; - if (available() <= 0) - return -1; - - if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) { - memChunk = memPart.seekMemChunk(readOffset); - } - - int lenToGo = Math.min(available(), len); - - int nRead = 0; - while (lenToGo > 0) { - int n; - if (memChunk != null) { - if (memChunk.headOffset() > readOffset) { - memChunk = null; - continue; - } - if (readOffset >= memChunk.tailOffset()) { - memChunk = memChunk.next; - continue; - } - int chunkOffset = (int) (readOffset - memChunk.headOffset()); - n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo); - System.arraycopy(memChunk.data, chunkOffset, b, off, n); - memRead += n; - } else { - n = diskPart.read(readOffset, b, off, lenToGo); - diskRead += n; - } - lenToGo -= n; - nRead += n; - off += n; - readOffset += n; - } - return nRead; - } - } - - @Override - public int available() throws IOException { - synchronized (lock) { - return (int) (length() - readOffset); - } - } - }; - - din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE)); - } - - @Override - public boolean hasNext() { - if (next != null) - return true; - - try { - if (din.available() > 0) { - block.importFrom(din); - next = block; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - return next != null; - } - - @Override - public GTRowBlock next() { - if (next == null) { - hasNext(); - if (next == null) - throw new NoSuchElementException(); - } - GTRowBlock r = next; - next = null; - return r; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws IOException { - synchronized (lock) { - din.close(); - diskPart.closeRead(); - if (debug) - logger.debug(MemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls"); - } - } - - } - - private class Writer implements IGTStoreWriter { - - final DataOutputStream dout; - long writeOffset; - long memWrite = 0; - long diskWrite = 0; - int nWriteCalls; - boolean closed = false; - - Writer(long startOffset) throws IOException { - writeOffset = 0; // TODO does not support append yet - memPart.clear(); - diskPart.clear(); - diskPart.openWrite(false); - if (debug) - logger.debug(MemDiskStore.this + " write start @ " + writeOffset); - - memPart.activateMemWrite(); - - OutputStream out = new OutputStream() { - byte[] tmp = new byte[1]; - boolean memPartActivated = true; - - @Override - public void write(int b) throws IOException { - tmp[0] = (byte) b; - write(tmp, 0, 1); - } - - @Override - public void write(byte[] bytes, int offset, int length) throws IOException { - // lock inside memPart.write() and diskPartm.write() - nWriteCalls++; - while (length > 0) { - int n; - if (memPartActivated) { - n = memPart.write(bytes, offset, length, writeOffset); - memWrite += n; - if (n == 0) { - memPartActivated = false; - } - } else { - n = diskPart.write(writeOffset, bytes, offset, length); - diskWrite += n; - } - offset += n; - length -= n; - writeOffset += n; - } - } - }; - dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE)); - } - - @Override - public void write(GTRowBlock block) throws IOException { - block.export(dout); - } - - @Override - public void close() throws IOException { - synchronized (lock) { - if (!closed) { - dout.close(); - memPart.deactivateMemWrite(); - } - - if (memPart.asyncFlusher == null) { - assert writeOffset == diskPart.tailOffset; - diskPart.closeWrite(); - ongoingWriter = null; - if (debug) - logger.debug(MemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls"); - } else { - // the asyncFlusher will call this close() again later - } - closed = true; - } - } - } - - private static class MemChunk { - long diskOffset; - int length; - byte[] data; - MemChunk next; - - boolean isFull() { - return length == data.length; - } - - long headOffset() { - return diskOffset; - } - - long tailOffset() { - return diskOffset + length; - } - - int freeSpace() { - return data.length - length; - } - } - - private class MemPart implements Closeable, MemoryConsumer { - - final MemoryBudgetController budgetCtrl; - - // async flush thread checks this flag out of sync block - volatile boolean writeActivated; - MemChunk firstChunk; - MemChunk lastChunk; - int chunkCount; - - Thread asyncFlusher; - MemChunk asyncFlushChunk; - long asyncFlushDiskOffset; - Throwable asyncFlushException; - - MemPart(MemoryBudgetController budgetCtrl) { - this.budgetCtrl = budgetCtrl; - } - - long headOffset() { - return firstChunk == null ? 0 : firstChunk.headOffset(); - } - - long tailOffset() { - return lastChunk == null ? 0 : lastChunk.tailOffset(); - } - - public MemChunk seekMemChunk(long diskOffset) { - MemChunk c = firstChunk; - while (c != null && c.headOffset() <= diskOffset) { - if (diskOffset < c.tailOffset()) - break; - c = c.next; - } - return c; - } - - public int write(byte[] bytes, int offset, int length, long diskOffset) { - int needMoreMem = 0; - - synchronized (lock) { - if (writeActivated == false) - return 0; - - // write is only expected at the tail - if (diskOffset != tailOffset()) - return 0; - - if (chunkCount == 0 || lastChunk.isFull()) - needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB; - } - - // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers - if (needMoreMem > 0) { - try { - budgetCtrl.reserve(this, needMoreMem); - } catch (NotEnoughBudgetException ex) { - deactivateMemWrite(); - return 0; - } - } - - synchronized (lock) { - if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) { - MemChunk chunk = new MemChunk(); - chunk.diskOffset = diskOffset; - chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead - if (chunkCount == 0) { - firstChunk = lastChunk = chunk; - } else { - lastChunk.next = chunk; - lastChunk = chunk; - } - chunkCount++; - } - - int n = Math.min(lastChunk.freeSpace(), length); - System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n); - lastChunk.length += n; - - if (n > 0) - asyncFlush(lastChunk, diskOffset, n); - - return n; - } - } - - private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) { - if (asyncFlushChunk == null) { - asyncFlushChunk = lastChunk; - asyncFlushDiskOffset = diskOffset; - } - - if (asyncFlusher == null) { - asyncFlusher = new Thread() { - public void run() { - asyncFlushException = null; - if (debug) - logger.debug(MemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset); - try { - while (writeActivated) { - flushToDisk(); - Thread.sleep(10); - } - flushToDisk(); - - if (debug) - logger.debug(MemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset); - - synchronized (lock) { - asyncFlusher = null; - asyncFlushChunk = null; - if (ongoingWriter.closed) { - ongoingWriter.close(); // call writer.close() again to clean up - } - } - } catch (Throwable ex) { - asyncFlushException = ex; - } - } - }; - asyncFlusher.start(); - } - } - - private void flushToDisk() throws IOException { - byte[] data; - int offset = 0; - int length = 0; - int flushedLen = 0; - - while (true) { - data = null; - synchronized (lock) { - asyncFlushDiskOffset += flushedLen; // bytes written in last loop - // if (debug) - // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset); - if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) { - asyncFlushChunk = asyncFlushChunk.next; - } - if (asyncFlushChunk != null) { - data = asyncFlushChunk.data; - offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset()); - length = asyncFlushChunk.length - offset; - } - } - - if (data == null) - break; - - flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length); - } - } - - @Override - public int freeUp(int mb) { - synchronized (lock) { - int mbReleased = 0; - while (chunkCount > 0 && mbReleased < mb) { - if (firstChunk == asyncFlushChunk) - break; - - mbReleased += MEM_CHUNK_SIZE_MB; - chunkCount--; - if (chunkCount == 0) { - firstChunk = lastChunk = null; - } else { - MemChunk next = firstChunk.next; - firstChunk.next = null; - firstChunk = next; - } - } - return mbReleased; - } - } - - public void activateMemWrite() { - if (budgetCtrl.getTotalBudgetMB() > 0) { - writeActivated = true; - if (debug) - logger.debug(MemDiskStore.this + " mem write activated"); - } - } - - public void deactivateMemWrite() { - writeActivated = false; - if (debug) - logger.debug(MemDiskStore.this + " mem write de-activated"); - } - - public void clear() { - chunkCount = 0; - firstChunk = lastChunk = null; - budgetCtrl.reserve(this, 0); - } - - @Override - public void close() throws IOException { - synchronized (lock) { - if (asyncFlushException != null) - throwAsyncException(asyncFlushException); - } - try { - asyncFlusher.join(); - } catch (NullPointerException npe) { - // that's fine, async flusher may not present - } catch (InterruptedException e) { - logger.warn("async join interrupted", e); - } - synchronized (lock) { - if (asyncFlushException != null) - throwAsyncException(asyncFlushException); - - clear(); - } - } - - private void throwAsyncException(Throwable ex) throws IOException { - if (ex instanceof IOException) - throw (IOException) ex; - else - throw new IOException(ex); - } - - @Override - public String toString() { - return MemDiskStore.this.toString(); - } - - } - - private class DiskPart implements Closeable { - final File diskFile; - FileChannel writeChannel; - FileChannel readChannel; - int readerCount = 0; // allow parallel readers - long tailOffset; - - DiskPart(File diskFile) throws IOException { - this.diskFile = diskFile; - this.tailOffset = diskFile.length(); - if (debug) - logger.debug(MemDiskStore.this + " disk file " + diskFile.getAbsolutePath()); - } - - public void openRead() throws IOException { - if (readChannel == null) { - readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ); - } - readerCount++; - } - - public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException { - return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset); - } - - public void closeRead() throws IOException { - closeRead(false); - } - - private void closeRead(boolean force) throws IOException { - readerCount--; - if (readerCount == 0 || force) { - if (readChannel != null) { - readChannel.close(); - readChannel = null; - } - } - } - - public void openWrite(boolean append) throws IOException { - if (append) { - writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE); - tailOffset = diskFile.length(); - } else { - diskFile.delete(); - writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); - tailOffset = 0; - } - } - - public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException { - synchronized (lock) { - int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset); - tailOffset = Math.max(diskOffset + n, tailOffset); - return n; - } - } - - public void closeWrite() throws IOException { - if (writeChannel != null) { - writeChannel.close(); - writeChannel = null; - } - } - - public void clear() throws IOException { - diskFile.delete(); - tailOffset = 0; - } - - @Override - public void close() throws IOException { - synchronized (lock) { - closeWrite(); - closeRead(true); - if (delOnClose) { - diskFile.delete(); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java index cc68e1b..2ef3d94 100644 --- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java @@ -23,10 +23,10 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; -import org.apache.kylin.job.common.HadoopShellExecutable; -import org.apache.kylin.job.common.MapReduceExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java deleted file mode 100644 index 05f8c8e..0000000 --- a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.manager; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.job.dao.ExecutableDao; -import org.apache.kylin.job.dao.ExecutableOutputPO; -import org.apache.kylin.job.dao.ExecutablePO; -import org.apache.kylin.job.exception.IllegalStateTranferException; -import org.apache.kylin.job.exception.PersistentException; -import org.apache.kylin.job.execution.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - */ -public class ExecutableManager { - - private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class); - private static final ConcurrentHashMap CACHE = new ConcurrentHashMap(); - @SuppressWarnings("unused") - private final KylinConfig config; - - private ExecutableDao executableDao; - - public static ExecutableManager getInstance(KylinConfig config) { - ExecutableManager r = CACHE.get(config); - if (r == null) { - r = new ExecutableManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - - } - return r; - } - - private ExecutableManager(KylinConfig config) { - logger.info("Using metadata url: " + config); - this.config = config; - this.executableDao = ExecutableDao.getInstance(config); - } - - public void addJob(AbstractExecutable executable) { - try { - executableDao.addJob(parse(executable)); - addJobOutput(executable); - } catch (PersistentException e) { - logger.error("fail to submit job:" + executable.getId(), e); - throw new RuntimeException(e); - } - } - - private void addJobOutput(AbstractExecutable executable) throws PersistentException { - ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); - executableOutputPO.setUuid(executable.getId()); - executableDao.addJobOutput(executableOutputPO); - if (executable instanceof DefaultChainedExecutable) { - for (AbstractExecutable subTask: ((DefaultChainedExecutable) executable).getTasks()) { - addJobOutput(subTask); - } - } - } - - //for ut - public void deleteJob(String jobId) { - try { - executableDao.deleteJob(jobId); - } catch (PersistentException e) { - logger.error("fail to delete job:" + jobId, e); - throw new RuntimeException(e); - } - } - - public AbstractExecutable getJob(String uuid) { - try { - return parseTo(executableDao.getJob(uuid)); - } catch (PersistentException e) { - logger.error("fail to get job:" + uuid, e); - throw new RuntimeException(e); - } - } - - public Output getOutput(String uuid) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid); - Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid); - return parseOutput(jobOutput); - } catch (PersistentException e) { - logger.error("fail to get job output:" + uuid, e); - throw new RuntimeException(e); - } - } - - private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) { - final DefaultOutput result = new DefaultOutput(); - result.setExtra(jobOutput.getInfo()); - result.setState(ExecutableState.valueOf(jobOutput.getStatus())); - result.setVerboseMsg(jobOutput.getContent()); - result.setLastModified(jobOutput.getLastModified()); - return result; - } - - public Map getAllOutputs() { - try { - final List jobOutputs = executableDao.getJobOutputs(); - HashMap result = Maps.newHashMap(); - for (ExecutableOutputPO jobOutput : jobOutputs) { - result.put(jobOutput.getId(), parseOutput(jobOutput)); - } - return result; - } catch (PersistentException e) { - logger.error("fail to get all job output:", e); - throw new RuntimeException(e); - } - } - - public List getAllExecutables() { - try { - return Lists.transform(executableDao.getJobs(), new Function() { - @Nullable - @Override - public AbstractExecutable apply(ExecutablePO input) { - return parseTo(input); - } - }); - } catch (PersistentException e) { - logger.error("error get All Jobs", e); - throw new RuntimeException(e); - } - } - - public List getAllJobIds() { - try { - return executableDao.getJobIds(); - } catch (PersistentException e) { - logger.error("error get All Job Ids", e); - throw new RuntimeException(e); - } - } - - public void updateAllRunningJobsToError() { - try { - final List jobOutputs = executableDao.getJobOutputs(); - for (ExecutableOutputPO executableOutputPO : jobOutputs) { - if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { - executableOutputPO.setStatus(ExecutableState.ERROR.toString()); - executableDao.updateJobOutput(executableOutputPO); - } - } - } catch (PersistentException e) { - logger.error("error reset job status from RUNNING to ERROR", e); - throw new RuntimeException(e); - } - } - - public void resumeJob(String jobId) { - AbstractExecutable job = getJob(jobId); - if (job == null) { - return; - } - updateJobOutput(jobId, ExecutableState.READY, null, null); - if (job instanceof DefaultChainedExecutable) { - List tasks = ((DefaultChainedExecutable) job).getTasks(); - for (AbstractExecutable task : tasks) { - if (task.getStatus() == ExecutableState.ERROR) { - updateJobOutput(task.getId(), ExecutableState.READY, null, null); - break; - } - } - } - } - - public void discardJob(String jobId) { - AbstractExecutable job = getJob(jobId); - if (job instanceof DefaultChainedExecutable) { - List tasks = ((DefaultChainedExecutable) job).getTasks(); - for (AbstractExecutable task : tasks) { - if (!task.getStatus().isFinalState()) { - updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null); - } - } - } - updateJobOutput(jobId, ExecutableState.DISCARDED, null, null); - } - - public void updateJobOutput(String jobId, ExecutableState newStatus, Map info, String output) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId); - ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus()); - if (newStatus != null && oldStatus != newStatus) { - if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) { - throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus); - } - jobOutput.setStatus(newStatus.toString()); - } - if (info != null) { - jobOutput.setInfo(info); - } - if (output != null) { - jobOutput.setContent(output); - } - executableDao.updateJobOutput(jobOutput); - logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus); - } catch (PersistentException e) { - logger.error("error change job:" + jobId + " to " + newStatus.toString()); - throw new RuntimeException(e); - } - } - - //for migration only - //TODO delete when migration finished - public void resetJobOutput(String jobId, ExecutableState state, String output) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - jobOutput.setStatus(state.toString()); - if (output != null) { - jobOutput.setContent(output); - } - executableDao.updateJobOutput(jobOutput); - } catch (PersistentException e) { - throw new RuntimeException(e); - } - } - - public void addJobInfo(String id, Map info) { - if (info == null) { - return; - } - try { - ExecutableOutputPO output = executableDao.getJobOutput(id); - Preconditions.checkArgument(output != null, "there is no related output for job id:" + id); - output.getInfo().putAll(info); - executableDao.updateJobOutput(output); - } catch (PersistentException e) { - logger.error("error update job info, id:" + id + " info:" + info.toString()); - throw new RuntimeException(e); - } - } - - public void addJobInfo(String id, String key, String value) { - Map info = Maps.newHashMap(); - info.put(key, value); - addJobInfo(id, info); - } - - private static ExecutablePO parse(AbstractExecutable executable) { - ExecutablePO result = new ExecutablePO(); - result.setName(executable.getName()); - result.setUuid(executable.getId()); - result.setType(executable.getClass().getName()); - result.setParams(executable.getParams()); - if (executable instanceof ChainedExecutable) { - List tasks = Lists.newArrayList(); - for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) { - tasks.add(parse(task)); - } - result.setTasks(tasks); - } - return result; - } - - private static AbstractExecutable parseTo(ExecutablePO executablePO) { - if (executablePO == null) { - return null; - } - String type = executablePO.getType(); - try { - Class clazz = ClassUtil.forName(type, AbstractExecutable.class); - Constructor constructor = clazz.getConstructor(); - AbstractExecutable result = constructor.newInstance(); - result.setId(executablePO.getUuid()); - result.setName(executablePO.getName()); - result.setParams(executablePO.getParams()); - List tasks = executablePO.getTasks(); - if (tasks != null && !tasks.isEmpty()) { - Preconditions.checkArgument(result instanceof ChainedExecutable); - for (ExecutablePO subTask: tasks) { - ((ChainedExecutable) result).addTask(parseTo(subTask)); - } - } - return result; - } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException("cannot parse this job:" + executablePO.getId(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java index 5fc445c..36feb15 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java @@ -33,6 +33,9 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.cube.model.HBaseColumnDesc; @@ -42,18 +45,15 @@ import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer; -import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil; -import org.apache.kylin.job.inmemcubing.ICuboidWriter; -import org.apache.kylin.job.inmemcubing.InMemCubeBuilder; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer; +import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable.TableSignature; -import org.apache.kylin.storage.cube.CuboidToGridTableMapping; -import org.apache.kylin.storage.gridtable.GTRecord; import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.InMemKeyValueCreator; +import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; +import org.apache.kylin.storage.hbase.steps.InMemKeyValueCreator; import org.apache.kylin.streaming.MicroStreamBatch; import org.apache.kylin.streaming.MicroStreamBatchConsumer; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java index e1ebe20..47ed52e 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java @@ -28,8 +28,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; /** */ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java deleted file mode 100644 index 89713c2..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.tools; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -import java.net.UnknownHostException; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; - -import org.apache.commons.httpclient.ConnectTimeoutException; -import org.apache.commons.httpclient.HttpClientError; -import org.apache.commons.httpclient.params.HttpConnectionParams; -import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory; -import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xduo - * - */ -public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory { - /** Log object for this class. */ - private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class); - private SSLContext sslcontext = null; - - /** - * Constructor for DefaultSslProtocolSocketFactory. - */ - public DefaultSslProtocolSocketFactory() { - super(); - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int) - */ - public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort); - } - - /** - * Attempts to get a new socket connection to the given host within the - * given time limit. - * - *

- * To circumvent the limitations of older JREs that do not support connect - * timeout a controller thread is executed. The controller thread attempts - * to create a new socket within the given limit of time. If socket - * constructor does not return until the timeout expires, the controller - * terminates and throws an {@link ConnectTimeoutException} - *

- * - * @param host - * the host name/IP - * @param port - * the port on the host - * @param localAddress - * the local host name/IP to bind the socket to - * @param localPort - * the port on the local machine - * @param params - * {@link HttpConnectionParams Http connection parameters} - * - * @return Socket a new socket - * - * @throws IOException - * if an I/O error occurs while creating the socket - * @throws UnknownHostException - * if the IP address of the host cannot be determined - * @throws ConnectTimeoutException - * DOCUMENT ME! - * @throws IllegalArgumentException - * DOCUMENT ME! - */ - public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException { - if (params == null) { - throw new IllegalArgumentException("Parameters may not be null"); - } - - int timeout = params.getConnectionTimeout(); - - if (timeout == 0) { - return createSocket(host, port, localAddress, localPort); - } else { - // To be eventually deprecated when migrated to Java 1.4 or above - return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout); - } - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int) - */ - public Socket createSocket(String host, int port) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(host, port); - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean) - */ - public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose); - } - - public boolean equals(Object obj) { - return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class)); - } - - public int hashCode() { - return DefaultX509TrustManager.class.hashCode(); - } - - private static SSLContext createEasySSLContext() { - try { - SSLContext context = SSLContext.getInstance("TLS"); - context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null); - - return context; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new HttpClientError(e.toString()); - } - } - - private SSLContext getSSLContext() { - if (this.sslcontext == null) { - this.sslcontext = createEasySSLContext(); - } - - return this.sslcontext; - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java deleted file mode 100644 index 8fc2dcd..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.tools; - -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509TrustManager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xduo - * - */ -public class DefaultX509TrustManager implements X509TrustManager { - - /** Log object for this class. */ - private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class); - private X509TrustManager standardTrustManager = null; - - /** - * Constructor for DefaultX509TrustManager. - * - */ - public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException { - super(); - - TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - factory.init(keystore); - - TrustManager[] trustmanagers = factory.getTrustManagers(); - - if (trustmanagers.length == 0) { - throw new NoSuchAlgorithmException("SunX509 trust manager not supported"); - } - - this.standardTrustManager = (X509TrustManager) trustmanagers[0]; - } - - public X509Certificate[] getAcceptedIssuers() { - return this.standardTrustManager.getAcceptedIssuers(); - } - - public boolean isClientTrusted(X509Certificate[] certificates) { - return true; - // return this.standardTrustManager.isClientTrusted(certificates); - } - - public boolean isServerTrusted(X509Certificate[] certificates) { - if ((certificates != null) && LOG.isDebugEnabled()) { - LOG.debug("Server certificate chain:"); - - for (int i = 0; i < certificates.length; i++) { - if (LOG.isDebugEnabled()) { - LOG.debug("X509Certificate[" + i + "]=" + certificates[i]); - } - } - } - - if ((certificates != null) && (certificates.length == 1)) { - X509Certificate certificate = certificates[0]; - - try { - certificate.checkValidity(); - } catch (CertificateException e) { - LOG.error(e.toString()); - - return false; - } - - return true; - } else { - return true; - // return this.standardTrustManager.isServerTrusted(certificates); - } - } - - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // TODO Auto-generated method stub - - } - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // TODO Auto-generated method stub - - } - -}