Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44C8118594 for ; Sat, 5 Dec 2015 00:50:31 +0000 (UTC) Received: (qmail 4891 invoked by uid 500); 5 Dec 2015 00:50:31 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 4855 invoked by uid 500); 5 Dec 2015 00:50:31 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 4840 invoked by uid 99); 5 Dec 2015 00:50:30 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 05 Dec 2015 00:50:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 635131A2948 for ; Sat, 5 Dec 2015 00:50:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.226 X-Spam-Level: * X-Spam-Status: No, score=1.226 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id qiV-AaZUy-Pf for ; Sat, 5 Dec 2015 00:50:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 4ABB5441C4 for ; Sat, 5 Dec 2015 00:50:17 +0000 (UTC) Received: (qmail 2260 invoked by uid 99); 5 Dec 2015 00:50:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 05 Dec 2015 00:50:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 12C19E2C3C; Sat, 5 Dec 2015 00:50:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ildar@apache.org To: commits@asterixdb.incubator.apache.org Date: Sat, 05 Dec 2015 00:50:20 -0000 Message-Id: <2f7f80228ad8414e849e83340eab6b6b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/8] incubator-asterixdb-hyracks git commit: Checkpointing: more or less stable version Checkpointing: more or less stable version Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/e05a1921 Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/e05a1921 Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/e05a1921 Branch: refs/heads/statistics Commit: e05a19219e067cc85ab6315f9e213597502c3b9e Parents: cf029cf Author: Ildar Absalyamov Authored: Fri Dec 4 11:04:18 2015 -0800 Committer: Ildar Absalyamov Committed: Fri Dec 4 11:04:18 2015 -0800 ---------------------------------------------------------------------- hyracks/hyrack-storage-am-statistics/pom.xml | 32 ++ .../am/statistics/common/StatisticsFactory.java | 37 ++ .../storage/am/statistics/common/Synopsis.java | 22 ++ .../am/statistics/common/SynopsisType.java | 7 + .../am/statistics/sketch/GroupCountSketch.java | 90 +++++ .../am/statistics/sketch/HashGenerator.java | 33 ++ .../am/statistics/sketch/QuickSelect.java | 47 +++ .../storage/am/statistics/sketch/Sketch.java | 24 ++ .../am/statistics/sketch/SketchSynopsis.java | 83 +++++ .../statistics/wavelet/WaveletCoefficient.java | 85 +++++ .../am/statistics/wavelet/WaveletSynopsis.java | 291 +++++++++++++++ .../am/statistics/common/StatisticsFactory.java | 37 ++ .../storage/am/statistics/common/Synopsis.java | 22 ++ .../am/statistics/common/SynopsisType.java | 7 + .../am/statistics/sketch/GroupCountSketch.java | 90 +++++ .../am/statistics/sketch/HashGenerator.java | 33 ++ .../am/statistics/sketch/QuickSelect.java | 47 +++ .../storage/am/statistics/sketch/Sketch.java | 24 ++ .../am/statistics/sketch/SketchSynopsis.java | 83 +++++ .../statistics/wavelet/WaveletCoefficient.java | 85 +++++ .../am/statistics/wavelet/WaveletSynopsis.java | 291 +++++++++++++++ .../org/apache/hyracks/data/std/api/IMath.java | 26 -- .../marshalling/ByteSerializerDeserializer.java | 53 +++ .../dataflow/common/util/SerdeUtils.java | 11 +- .../lsm/btree/LSMBTreeOperatorTestHelper.java | 2 +- .../api/IOrdinalPrimitiveValueProvider.java | 24 ++ .../IOrdinalPrimitiveValueProviderFactory.java | 26 ++ .../api/IPrimitiveIntegerValueProvider.java | 30 -- .../IPrimitiveIntegerValueProviderFactory.java | 26 -- ...bleIntegerPrimitiveValueProviderFactory.java | 52 --- .../am/common/statistics/StatisticsFactory.java | 31 -- .../storage/am/common/statistics/Synopsis.java | 22 -- .../am/common/statistics/SynopsisType.java | 7 - .../statistics/sketch/GroupCountSketch.java | 90 ----- .../common/statistics/sketch/HashGenerator.java | 33 -- .../common/statistics/sketch/QuickSelect.java | 47 --- .../am/common/statistics/sketch/Sketch.java | 24 -- .../statistics/sketch/SketchSynopsis.java | 83 ----- .../statistics/wavelet/WaveletCoefficient.java | 83 ----- .../statistics/wavelet/WaveletSynopsis.java | 263 -------------- hyracks/hyracks-storage-am-lsm-btree/pom.xml | 15 +- .../AbstractLSMBTreeDataflowHelper.java | 12 +- .../AbstractLSMBTreeDataflowHelperFactory.java | 12 +- .../dataflow/ExternalBTreeDataflowHelper.java | 17 +- .../ExternalBTreeDataflowHelperFactory.java | 9 +- .../ExternalBTreeWithBuddyDataflowHelper.java | 13 +- ...rnalBTreeWithBuddyDataflowHelperFactory.java | 10 +- .../btree/dataflow/LSMBTreeDataflowHelper.java | 14 +- .../dataflow/LSMBTreeDataflowHelperFactory.java | 10 +- .../am/lsm/btree/impls/ExternalBTree.java | 4 +- .../storage/am/lsm/btree/impls/LSMBTree.java | 59 ++- .../lsm/btree/impls/LSMBTreeDiskComponent.java | 2 +- .../impls/LSMBTreeDiskComponentFactory.java | 2 +- .../am/lsm/btree/util/LSMBTreeUtils.java | 12 +- ...ytePrimitiveIntegerValueProviderFactory.java | 57 --- ...gerPrimitiveIntegerValueProviderFactory.java | 57 --- ...ongPrimitiveIntegerValueProviderFactory.java | 57 --- ...ortPrimitiveIntegerValueProviderFactory.java | 57 --- .../BytePrimitiveValueProviderFactory.java | 55 +++ .../IntegerPrimitiveValueProviderFactory.java | 15 +- .../LongPrimitiveValueProviderFactory.java | 55 +++ .../ShortPrimitiveValueProviderFactory.java | 55 +++ hyracks/hyracks-storage-am-statistics/pom.xml | 32 ++ .../am/statistics/common/StatisticsFactory.java | 31 ++ .../storage/am/statistics/common/Synopsis.java | 22 ++ .../am/statistics/common/SynopsisType.java | 7 + .../common/TypeTraitsDomainUtils.java | 26 ++ .../am/statistics/sketch/GroupCountSketch.java | 90 +++++ .../am/statistics/sketch/HashGenerator.java | 33 ++ .../am/statistics/sketch/QuickSelect.java | 47 +++ .../storage/am/statistics/sketch/Sketch.java | 24 ++ .../am/statistics/sketch/SketchSynopsis.java | 83 +++++ .../statistics/wavelet/WaveletCoefficient.java | 91 +++++ .../am/statistics/wavelet/WaveletSynopsis.java | 363 +++++++++++++++++++ .../am/statistics/wavelet/WaveletTuple.java | 47 +++ hyracks/pom.xml | 1 + 76 files changed, 2779 insertions(+), 1120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/pom.xml b/hyracks/hyrack-storage-am-statistics/pom.xml new file mode 100644 index 0000000..4df9e0a --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/pom.xml @@ -0,0 +1,32 @@ + + 4.0.0 + hyracks-storage-am-statistics + + + org.apache.hyracks + hyracks + 0.2.17-SNAPSHOT + + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + org.apache.hyracks + hyracks-storage-am-common + 0.2.17-SNAPSHOT + jar + compile + + + http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java new file mode 100644 index 0000000..5507a94 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java @@ -0,0 +1,37 @@ +package org.apache.hyracks.storage.am.statistics.common; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.data.std.api.IPointableFactory; +import org.apache.hyracks.data.std.api.NumericPointable; +import org.apache.hyracks.data.std.api.OrdinalPointable; +import org.apache.hyracks.storage.am.statistics.wavelet.WaveletSynopsis; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.file.IFileMapProvider; + +public class StatisticsFactory { + private final IBufferCache bufferCache; + private final IFileMapProvider fileMapProvider; + private final int[] statisticsKeyFields; + private final IPointableFactory keyPointableFactory; + private final IPointableFactory valuePointableFactory; + + public StatisticsFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider, int[] statisticsKeyFields, + IPointableFactory keyPointableFactory, + IPointableFactory valuePointableFactory) { + this.bufferCache = bufferCache; + this.fileMapProvider = fileMapProvider; + this.statisticsKeyFields = statisticsKeyFields; + this.keyPointableFactory = keyPointableFactory; + this.valuePointableFactory = valuePointableFactory; + } + + public WaveletSynopsis createWaveletStatistics(FileReference file) throws HyracksDataException { + return new WaveletSynopsis(bufferCache, fileMapProvider, file, statisticsKeyFields, 10, keyPointableFactory, + valuePointableFactory); + } + + public int[] getStatisticsFields() { + return statisticsKeyFields; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java new file mode 100644 index 0000000..12dbfee --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java @@ -0,0 +1,22 @@ +package org.apache.hyracks.storage.am.statistics.common; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.common.api.IIndexBulkLoader; +import org.apache.hyracks.storage.am.common.impls.AbstractFileManager; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.file.IFileMapProvider; + +public abstract class Synopsis extends AbstractFileManager { + + public Synopsis(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file) { + super(bufferCache, fileMapProvider, file); + } + + public abstract void addElement(K key, V value); + + public abstract IIndexBulkLoader createBuilder() throws HyracksDataException; + + public abstract int getNumPages() throws HyracksDataException; + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java new file mode 100644 index 0000000..045b398 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java @@ -0,0 +1,7 @@ +package org.apache.hyracks.storage.am.statistics.common; + +public enum SynopsisType { + None, + Wavelet, + Sketch +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java new file mode 100644 index 0000000..0fdeab3 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java @@ -0,0 +1,90 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +public class GroupCountSketch extends Sketch { + + private final int levels; + private final int depth; + private final int buckets; + private final int subbuckets; + private final int fanoutLog; + private final double counters[][][][]; + private final long[][] hashSeeds; + + public GroupCountSketch(int levels, int depth, int width, int fanoutLog/*double prob, double accuracy*/) { + this.levels = levels; + this.depth = depth; + this.buckets = width; + this.fanoutLog = fanoutLog; + this.subbuckets = width * width; + + counters = new double[this.levels][this.depth][this.buckets][this.subbuckets]; + hashSeeds = new long[this.depth][8]; + initSeeds(this.depth, 8, hashSeeds); + } + + public void update(long item, double diff) { + int i, j, h, f, mult; + long group; + + for (i = 0; i < depth; i++) { + mult = HashGenerator.fourwise(this.hashSeeds[i][4], this.hashSeeds[i][5], this.hashSeeds[i][6], + this.hashSeeds[i][7], item); + + f = HashGenerator.hash31(this.hashSeeds[i][2], this.hashSeeds[i][3], item); + f = f % (this.subbuckets); + + for (j = 0, group = item; j < levels; j++, group >>= fanoutLog) { + + h = HashGenerator.hash31(this.hashSeeds[i][0], this.hashSeeds[i][1], group); + h = h % (this.buckets); + + if ((mult & 1) == 1) + this.counters[j][i][h][f] += diff; + else + this.counters[j][i][h][f] -= diff; + } + } + } + + public double count(int group, int level) { + int h, f, mult; + double[] estimates = new double[depth]; + + for (int i = 0; i < depth; i++) { + h = HashGenerator.hash31(this.hashSeeds[i][0], this.hashSeeds[i][1], group); + h = h % (this.buckets); + + f = HashGenerator.hash31(this.hashSeeds[i][2], this.hashSeeds[i][3], group); + f = f % (this.subbuckets); + + mult = HashGenerator.fourwise(this.hashSeeds[i][4], this.hashSeeds[i][5], this.hashSeeds[i][6], + this.hashSeeds[i][7], group); + if ((mult & 1) == 1) + estimates[i] += this.counters[level][i][h][f]; + else + estimates[i] -= this.counters[level][i][h][f]; + } + + return getMedian(estimates, depth); + } + + public double energyEst(int group, int level) { + // estimate the F2 moment of the vector (sum of squares) + + int i, j; + double z; + + double estimates[] = new double[depth]; + for (i = 0; i < depth; i++) { + int h = HashGenerator.hash31(this.hashSeeds[i][0], this.hashSeeds[i][1], group); + h = h % (this.buckets); + z = 0; + for (j = 0; j < this.subbuckets; j++) { + z += Math.pow(this.counters[level][i][h][j], 2.0); + } + estimates[i] = z; + } + + return getMedian(estimates, depth); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java new file mode 100644 index 0000000..9ad1caa --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java @@ -0,0 +1,33 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +public class HashGenerator { + + private static int MOD = 2147483647; + private static int HL = 31; + + public static int hash31(long a, long b, long x) { + + long result; + + // return a hash of x using a and b mod (2^31 - 1) + // may need to do another mod afterwards, or drop high bits + // depending on d, number of bad guys + // 2^31 - 1 = 2147483647 + + // result = ((long long) a)*((long long) x)+((long long) b); + result = (a * x) + b; + result = ((result >> HL) + result) & MOD; + + return (int) result; + } + + public static int fourwise(long a, long b, long c, long d, long x) { + int result; + + // returns values that are 4-wise independent by repeated calls + // to the pairwise independent routine. + + result = hash31(hash31(hash31(x, a, b), x, c), x, d); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java new file mode 100644 index 0000000..80c461d --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java @@ -0,0 +1,47 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +import java.util.Random; + +public class QuickSelect { + + private static Random rand = new Random(); + + private static int partition(double[] arr, int left, int right, int pivot) { + double pivotVal = arr[pivot]; + swap(arr, pivot, right); + int storeIndex = left; + for (int i = left; i < right; i++) { + if (arr[i] < pivotVal) { + swap(arr, i, storeIndex); + storeIndex++; + } + } + swap(arr, right, storeIndex); + return storeIndex; + } + + public static double select(double[] arr, int n) { + int left = 0; + int right = arr.length - 1; + while (right >= left) { + int pivotIndex = partition(arr, left, right, rand.nextInt(right - left + 1) + left); + if (pivotIndex == n) { + return arr[pivotIndex]; + } else if (pivotIndex < n) { + left = pivotIndex + 1; + } else { + right = pivotIndex - 1; + } + } + return 0; + } + + private static void swap(double[] arr, int i1, int i2) { + if (i1 != i2) { + double temp = arr[i1]; + arr[i1] = arr[i2]; + arr[i2] = temp; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java new file mode 100644 index 0000000..65cc731 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java @@ -0,0 +1,24 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +import java.util.Random; + +public abstract class Sketch { + + protected void initSeeds(int k, int m, long[][] hashSeeds) { + Random prng = new Random(); + + int j, i; + for (i = 0; i < k; i++) { + for (j = 0; j < m; j++) { + hashSeeds[i][j] = Math.abs(prng.nextLong()); //(int) prng.genInt(); + // initialise the hash functions + // prng_int() should return a random integer + // uniformly distributed in the range 0..2^31 + } + } + } + + protected static double getMedian(double[] data, int length) { + return QuickSelect.select(data, data.length / 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java new file mode 100644 index 0000000..fff8ce8 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java @@ -0,0 +1,83 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +//public class SketchSynopsis extends Synopsis implements IIndexBulkLoader { +// +// private final int levelNum; +// private final int fanoutLog; +// private final double epsilon; +// private final GroupCountSketch gcSketch; +// private final IBufferCache bufferCache; +// private final IFileMapProvider fileMapProvider; +// private final FileReference file; +// private final int[] keyFields; +// private final int fileId = -1; +// private final boolean isActivated = false; +// +// public SketchSynopsis(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, +// int[] keyFields, int domainSize, double delta, double epsilon, int fanOut) { +// super(bufferCache, fileMapProvider, file); +// this.keyFields = keyFields; +// this.fanoutLog = (int) (Math.log(fanOut) / Math.log(2.0)); +// this.levelNum = domainSize / fanoutLog; +// this.epsilon = epsilon; +// final int depth = (int) Math.ceil(Math.log(1 / delta)); +// final int width = (int) Math.ceil(1 / epsilon); +// gcSketch = new GroupCountSketch(this.levelNum + 1, depth, width, fanoutLog); +// } +// +// public void update(long item, double diff) { +// //translate position to coefficient +// item += 1 << (levelNum * fanoutLog); +// //transform update into wavelet domain +// long div = 1; +// for (int i = 0; i < levelNum; i++) { +// // Long coeffIdx = (long) ((1 << ((levelNum - i) * fanoutLog)) + item); +// item >>= (fanoutLog - 1); +// int sign = (item & 1) == 0 ? 1 : -1; +// item >>= 1; +// double normCoeff = WaveletCoefficient.getNormalizationCoefficient(levelNum * fanoutLog, +// (i + 1) * fanoutLog); +// div = (1 << ((i + 1) * fanoutLog)); +// +// gcSketch.update(item, diff * sign / (normCoeff * div)); +// } +// gcSketch.update(0, diff / div); +// } +// +// @Override +// public void add(ITupleReference tuple) throws IndexException, HyracksDataException { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public void end() throws IndexException, HyracksDataException { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public void create() { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public void activate() { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public IIndexBulkLoader createBuilder(long numElements) { +// // TODO Auto-generated method stub +// return null; +// } +// +// @Override +// public void addElement(Object key, Object value) { +// // TODO Auto-generated method stub +// +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java new file mode 100644 index 0000000..c3a6186 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java @@ -0,0 +1,85 @@ +package org.apache.hyracks.storage.am.statistics.wavelet; + +import java.util.Map.Entry; +import java.util.Objects; + +import org.apache.hyracks.data.std.api.IComparable; +import org.apache.hyracks.data.std.api.INumeric; +import org.apache.hyracks.data.std.api.IPointable; + +public class WaveletCoefficient*/, V extends IPointable & IComparable /* extends IMath*/> + implements Entry, Comparable> { + + public V value; + public int level; + public K index; + + public WaveletCoefficient() { + } + + public WaveletCoefficient(V value, int level, K index) { + this.value = value; + this.level = level; + this.index = index; + } + + @Override + public K getKey() { + return index; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + this.value = value; + return this.value; + } + + public void setLevel(int level) { + this.level = level; + } + + public void setIndex(K index) { + this.index = index; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof WaveletCoefficient)) + return false; + @SuppressWarnings("unchecked") + WaveletCoefficient triple = (WaveletCoefficient) o; + return triple.value.equals(value) && triple.level == level && triple.index.equals(index); + } + + @Override + public int hashCode() { + return Objects.hash(value, level, index); + } + + public static Double getNormalizationCoefficient(int maxLevel, int level) { + return (1 << ((maxLevel - level) / 2)) * ((((maxLevel - level) % 2) == 0) ? 1 : Math.sqrt(2)); + } + + public static int getLevel(K coeffPointable, int maxLevel) { + long coeffIdx = coeffPointable.longValue(); + if (coeffIdx == 0) + return maxLevel; + int level = -1; + while (coeffIdx > 0) { + coeffIdx = coeffIdx >> 1; + level++; + } + return maxLevel - level; + } + + @Override + // default comparator based on coefficient value + public int compareTo(WaveletCoefficient o) { + return value.compareTo(o.getValue()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java new file mode 100644 index 0000000..cd92489 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java @@ -0,0 +1,291 @@ +package org.apache.hyracks.storage.am.statistics.wavelet; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.PriorityQueue; +import java.util.Stack; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.data.std.api.IPointableFactory; +import org.apache.hyracks.data.std.api.NumericPointable; +import org.apache.hyracks.data.std.api.OrdinalPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer; +import org.apache.hyracks.storage.am.common.api.IIndexBulkLoader; +import org.apache.hyracks.storage.am.common.api.IndexException; +import org.apache.hyracks.storage.am.statistics.common.Synopsis; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.file.BufferedFileHandle; +import org.apache.hyracks.storage.common.file.IFileMapProvider; + +public class WaveletSynopsis + /*, V extends IMath>*/ extends Synopsis { + + private final static int METADATA_PAGE_ID = 0; + private final static int NUM_PAGES_OFFSET = 0; + private final static int NUM_ELEMENTS_OFFSET = NUM_PAGES_OFFSET + 4; + + private final int[] waveletFields; + //private final IPrimitiveIntegerValueProvider waveletFieldValueProvider; + private final IPointableFactory keyPointableFactory; + private final IPointableFactory valuePointableFactory; + + private final PriorityQueue> coefficients; + private final long threshold; + + private final int numPages; + + public WaveletSynopsis(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, + int[] keyFields, int threshold, IPointableFactory keyPointableFactory, + IPointableFactory valuePointableFactory) { + super(bufferCache, fileMapProvider, file); + this.waveletFields = keyFields; + this.keyPointableFactory = keyPointableFactory; + this.valuePointableFactory = valuePointableFactory; + this.threshold = threshold; + this.coefficients = new PriorityQueue<>(threshold); + this.numPages = (int) Math.ceil(threshold * (4 + 8) / (double) bufferCache.getPageSize()); + } + + @Override + public int getNumPages() throws HyracksDataException { + if (!isActivated) { + throw new HyracksDataException("The synopsis is not activated."); + } + return numPages; + } + + private void initWaveletSynopsisMetaData() throws HyracksDataException { + ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), true); + metaPage.acquireWriteLatch(); + try { + metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0); + metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L); + } finally { + metaPage.releaseWriteLatch(true); + bufferCache.unpin(metaPage); + } + } + + @Override + // Adds a new coefficient to the transform, subject to thresholding + public void addElement(OrdinalPointable index, NumericPointable value) { + WaveletCoefficient newCoeff; + if (coefficients.size() < threshold) + newCoeff = new WaveletCoefficient(value, 0, index); + else { + newCoeff = coefficients.poll(); + newCoeff.setValue(value); + newCoeff.setIndex(index); + } + coefficients.add(newCoeff); + + } + + @Override + public IIndexBulkLoader createBuilder() throws HyracksDataException { + return new SparseTransformBuilder(); + } + + public class SparseTransformBuilder implements IIndexBulkLoader { + private final Stack> avgStack; + private final Pair curr = Pair.of(keyPointableFactory.createPointable(), + valuePointableFactory.createPointable()); + private long transformPos; + private final long domainEnd; + private int lastLevel; + private final int maxLevel; + private final List> borderTuples; + private final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); + + public SparseTransformBuilder() throws HyracksDataException { + avgStack = new Stack<>(); + borderTuples = new ArrayList<>(2); + // set initial transform position to minimal value for key doimain + transformPos = curr.getKey().minDomainValue(); + domainEnd = curr.getKey().maxDomainValue(); + + DoubleSerializerDeserializer.INSTANCE.serialize(1.0, abvs.getDataOutput()); + curr.getRight().set(abvs.getByteArray(), bytesWritten, abvs.getLength() - bytesWritten); + bytesWritten = abvs.getLength(); + + maxLevel = curr.getKey().maxLevel(); + lastLevel = 0; + + persistWaveletSynopsisMetaData(); + } + + private void persistWaveletSynopsisMetaData() throws HyracksDataException { + ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false); + metaPage.acquireWriteLatch(); + try { + metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages); + metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, threshold); + } finally { + metaPage.releaseWriteLatch(true); + bufferCache.unpin(metaPage); + } + } + + // method computes number of levels (log base 2 of the distance) between two points + private int computeLogDistance(long x, long y) { + return (int) Math.floor(Math.log(x - y + 1) / Math.log(2)); + } + + @Override + public void add(ITupleReference tuple) throws IndexException, HyracksDataException { + // if (waveletFields.length > 1) + // throw new HyracksDataException("Wavelet synopsis does not support composite keys"); + curr.getKey().set(tuple.getFieldData(waveletFields[0]), tuple.getFieldStart(waveletFields[0]), + tuple.getFieldLength(waveletFields[0])); + + while (transformPos.compareTo(curr.getKey()) <= 0) { + // current position is a left border of dyadic range + if (curr.getKey().longValue() == transformPos) { + borderTuples.add(Pair.of(0, (double) curr.getValue())); + return; + } + int newLevel = computeLogDistance(curr.getKey().longValue(), transformPos); + long levelRightBorder = 1l << newLevel; + //add first dummy average + if (avgStack.isEmpty()) { + avgStack.push(new WaveletCoefficient(0.0, maxLevel, 0)); + lastLevel = newLevel; + } + + // current position is a right border of dyadic range + if (curr.getKey() == transformPos + levelRightBorder - 1 /*&& curr.position != domainMax*/) + borderTuples.add(Pair.of((int) (levelRightBorder - 1), (double) curr.getValue())); + + WaveletCoefficient newCoeff; + WaveletCoefficient topCoeff = avgStack.peek(); + if (newLevel >= lastLevel) { + topCoeff = computeDyadicRange(lastLevel, maxLevel, topCoeff, borderTuples); + newCoeff = topCoeff; + do { + WaveletCoefficient oldCoeff = avgStack.pop(); + //skip first dummy coefficient + if (oldCoeff.index > 0) + newCoeff = average(oldCoeff, newCoeff, waveletFieldValueProvider.maxLevel( + tuple.getFieldData(waveletFields[0]), tuple.getFieldStart(waveletFields[0]))); + } while (!avgStack.isEmpty() && avgStack.peek().level == newCoeff.level); + } else { + newCoeff = computeDyadicRange(newLevel, maxLevel, topCoeff, borderTuples); + topCoeff = newCoeff; + } + avgStack.push(newCoeff); + transformPos += 1l << topCoeff.level; + lastLevel = newCoeff.level; + + borderTuples.clear(); + } + } + + private WaveletCoefficient average( + WaveletCoefficient oldCoeff, + WaveletCoefficient newCoeff, int maxLevel) { + Integer coeffIdx = oldCoeff.index >> 1; + addElement(coeffIdx, (oldCoeff.value - newCoeff.value) + / (2.0 * WaveletCoefficient.getNormalizationCoefficient(maxLevel, oldCoeff.level + 1))); + WaveletCoefficient topCoeff = new WaveletCoefficient( + (oldCoeff.value + newCoeff.value) / 2.0, oldCoeff.level + 1, coeffIdx); + return topCoeff; + } + + private WaveletCoefficient computeDyadicRange(int level, int maxLevel, + WaveletCoefficient topCoeff, + List> borderTuples) { + //short circuit coefficient computation for 0 + if (borderTuples.isEmpty()) { + Integer coeffIdx = ((topCoeff.index + 1) << (topCoeff.level - level)); + return new WaveletCoefficient(0.0, level, coeffIdx); + } + + Map newCoefs = new HashMap<>(); + Double avg = 0.0; + Integer coeffIdx = -1; + for (int i = 1; i <= level; i++) { + newCoefs.clear(); + avg = 0.0; + for (int j = 0; j < borderTuples.size(); j++) { + Pair item = borderTuples.get(j); + coeffIdx = ((topCoeff.index + 1) << (topCoeff.level - i)) + (item.getKey() >> 1); + Double newValue = item.getValue() / 2.0; + Double oldValue = newCoefs.containsKey(coeffIdx) ? newCoefs.get(coeffIdx) : 0; + if ((item.getKey() & 0x1) == 1) { + newCoefs.put(coeffIdx, (oldValue - newValue)); + } else { + newCoefs.put(coeffIdx, (oldValue + newValue)); + } + avg += newValue; + borderTuples.set(j, Pair.of(item.getKey() >> 1, newValue)); + } + + for (Entry e : newCoefs.entrySet()) + addElement(e.getKey(), e.getValue() / WaveletCoefficient.getNormalizationCoefficient(maxLevel, i)); + } + return new WaveletCoefficient(avg, level, coeffIdx); + } + + @Override + public void end() throws IndexException, HyracksDataException { + //assert(avgStack.size() == 1); + if (curr.getKey() != domainEnd) { + + } + WaveletCoefficient topCoeff = avgStack.pop(); + topCoeff.index = 0; + addElement(topCoeff.index, topCoeff.value); + + persistStatistics(); + } + + private void persistStatistics() throws HyracksDataException { + List> persistCoefficients = new ArrayList<>( + coefficients); + // sorting coefficients according their keys + Collections.sort(persistCoefficients, + new Comparator>() { + @Override + public int compare(WaveletCoefficient o1, + WaveletCoefficient o2) { + return o1.getKey().compareTo(o2.getKey()); + } + }); + Iterator> it = persistCoefficients.iterator(); + int currentPageId = 1; + while (currentPageId <= numPages) { + ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true); + ByteBuffer buffer = page.getBuffer(); + page.acquireWriteLatch(); + try { + while (it.hasNext() && (buffer.limit() - buffer.position()) >= 4 + 8) { + WaveletCoefficient coeff = it.next(); + buffer.put(coeff.getKey().getByteArray(), coeff.getKey().getStartOffset(), + coeff.getKey().getLength()); + buffer.put(coeff.getValue().getByteArray(), coeff.getValue().getStartOffset(), + coeff.getValue().getLength()); + } + } finally { + page.releaseWriteLatch(true); + bufferCache.unpin(page); + } + ++currentPageId; + } + + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java new file mode 100644 index 0000000..5507a94 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/StatisticsFactory.java @@ -0,0 +1,37 @@ +package org.apache.hyracks.storage.am.statistics.common; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.data.std.api.IPointableFactory; +import org.apache.hyracks.data.std.api.NumericPointable; +import org.apache.hyracks.data.std.api.OrdinalPointable; +import org.apache.hyracks.storage.am.statistics.wavelet.WaveletSynopsis; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.file.IFileMapProvider; + +public class StatisticsFactory { + private final IBufferCache bufferCache; + private final IFileMapProvider fileMapProvider; + private final int[] statisticsKeyFields; + private final IPointableFactory keyPointableFactory; + private final IPointableFactory valuePointableFactory; + + public StatisticsFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider, int[] statisticsKeyFields, + IPointableFactory keyPointableFactory, + IPointableFactory valuePointableFactory) { + this.bufferCache = bufferCache; + this.fileMapProvider = fileMapProvider; + this.statisticsKeyFields = statisticsKeyFields; + this.keyPointableFactory = keyPointableFactory; + this.valuePointableFactory = valuePointableFactory; + } + + public WaveletSynopsis createWaveletStatistics(FileReference file) throws HyracksDataException { + return new WaveletSynopsis(bufferCache, fileMapProvider, file, statisticsKeyFields, 10, keyPointableFactory, + valuePointableFactory); + } + + public int[] getStatisticsFields() { + return statisticsKeyFields; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java new file mode 100644 index 0000000..12dbfee --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/Synopsis.java @@ -0,0 +1,22 @@ +package org.apache.hyracks.storage.am.statistics.common; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.common.api.IIndexBulkLoader; +import org.apache.hyracks.storage.am.common.impls.AbstractFileManager; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.file.IFileMapProvider; + +public abstract class Synopsis extends AbstractFileManager { + + public Synopsis(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file) { + super(bufferCache, fileMapProvider, file); + } + + public abstract void addElement(K key, V value); + + public abstract IIndexBulkLoader createBuilder() throws HyracksDataException; + + public abstract int getNumPages() throws HyracksDataException; + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java new file mode 100644 index 0000000..045b398 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/common/SynopsisType.java @@ -0,0 +1,7 @@ +package org.apache.hyracks.storage.am.statistics.common; + +public enum SynopsisType { + None, + Wavelet, + Sketch +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java new file mode 100644 index 0000000..0fdeab3 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/GroupCountSketch.java @@ -0,0 +1,90 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +public class GroupCountSketch extends Sketch { + + private final int levels; + private final int depth; + private final int buckets; + private final int subbuckets; + private final int fanoutLog; + private final double counters[][][][]; + private final long[][] hashSeeds; + + public GroupCountSketch(int levels, int depth, int width, int fanoutLog/*double prob, double accuracy*/) { + this.levels = levels; + this.depth = depth; + this.buckets = width; + this.fanoutLog = fanoutLog; + this.subbuckets = width * width; + + counters = new double[this.levels][this.depth][this.buckets][this.subbuckets]; + hashSeeds = new long[this.depth][8]; + initSeeds(this.depth, 8, hashSeeds); + } + + public void update(long item, double diff) { + int i, j, h, f, mult; + long group; + + for (i = 0; i < depth; i++) { + mult = HashGenerator.fourwise(this.hashSeeds[i][4], this.hashSeeds[i][5], this.hashSeeds[i][6], + this.hashSeeds[i][7], item); + + f = HashGenerator.hash31(this.hashSeeds[i][2], this.hashSeeds[i][3], item); + f = f % (this.subbuckets); + + for (j = 0, group = item; j < levels; j++, group >>= fanoutLog) { + + h = HashGenerator.hash31(this.hashSeeds[i][0], this.hashSeeds[i][1], group); + h = h % (this.buckets); + + if ((mult & 1) == 1) + this.counters[j][i][h][f] += diff; + else + this.counters[j][i][h][f] -= diff; + } + } + } + + public double count(int group, int level) { + int h, f, mult; + double[] estimates = new double[depth]; + + for (int i = 0; i < depth; i++) { + h = HashGenerator.hash31(this.hashSeeds[i][0], this.hashSeeds[i][1], group); + h = h % (this.buckets); + + f = HashGenerator.hash31(this.hashSeeds[i][2], this.hashSeeds[i][3], group); + f = f % (this.subbuckets); + + mult = HashGenerator.fourwise(this.hashSeeds[i][4], this.hashSeeds[i][5], this.hashSeeds[i][6], + this.hashSeeds[i][7], group); + if ((mult & 1) == 1) + estimates[i] += this.counters[level][i][h][f]; + else + estimates[i] -= this.counters[level][i][h][f]; + } + + return getMedian(estimates, depth); + } + + public double energyEst(int group, int level) { + // estimate the F2 moment of the vector (sum of squares) + + int i, j; + double z; + + double estimates[] = new double[depth]; + for (i = 0; i < depth; i++) { + int h = HashGenerator.hash31(this.hashSeeds[i][0], this.hashSeeds[i][1], group); + h = h % (this.buckets); + z = 0; + for (j = 0; j < this.subbuckets; j++) { + z += Math.pow(this.counters[level][i][h][j], 2.0); + } + estimates[i] = z; + } + + return getMedian(estimates, depth); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java new file mode 100644 index 0000000..9ad1caa --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/HashGenerator.java @@ -0,0 +1,33 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +public class HashGenerator { + + private static int MOD = 2147483647; + private static int HL = 31; + + public static int hash31(long a, long b, long x) { + + long result; + + // return a hash of x using a and b mod (2^31 - 1) + // may need to do another mod afterwards, or drop high bits + // depending on d, number of bad guys + // 2^31 - 1 = 2147483647 + + // result = ((long long) a)*((long long) x)+((long long) b); + result = (a * x) + b; + result = ((result >> HL) + result) & MOD; + + return (int) result; + } + + public static int fourwise(long a, long b, long c, long d, long x) { + int result; + + // returns values that are 4-wise independent by repeated calls + // to the pairwise independent routine. + + result = hash31(hash31(hash31(x, a, b), x, c), x, d); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java new file mode 100644 index 0000000..80c461d --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/QuickSelect.java @@ -0,0 +1,47 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +import java.util.Random; + +public class QuickSelect { + + private static Random rand = new Random(); + + private static int partition(double[] arr, int left, int right, int pivot) { + double pivotVal = arr[pivot]; + swap(arr, pivot, right); + int storeIndex = left; + for (int i = left; i < right; i++) { + if (arr[i] < pivotVal) { + swap(arr, i, storeIndex); + storeIndex++; + } + } + swap(arr, right, storeIndex); + return storeIndex; + } + + public static double select(double[] arr, int n) { + int left = 0; + int right = arr.length - 1; + while (right >= left) { + int pivotIndex = partition(arr, left, right, rand.nextInt(right - left + 1) + left); + if (pivotIndex == n) { + return arr[pivotIndex]; + } else if (pivotIndex < n) { + left = pivotIndex + 1; + } else { + right = pivotIndex - 1; + } + } + return 0; + } + + private static void swap(double[] arr, int i1, int i2) { + if (i1 != i2) { + double temp = arr[i1]; + arr[i1] = arr[i2]; + arr[i2] = temp; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java new file mode 100644 index 0000000..65cc731 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/Sketch.java @@ -0,0 +1,24 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +import java.util.Random; + +public abstract class Sketch { + + protected void initSeeds(int k, int m, long[][] hashSeeds) { + Random prng = new Random(); + + int j, i; + for (i = 0; i < k; i++) { + for (j = 0; j < m; j++) { + hashSeeds[i][j] = Math.abs(prng.nextLong()); //(int) prng.genInt(); + // initialise the hash functions + // prng_int() should return a random integer + // uniformly distributed in the range 0..2^31 + } + } + } + + protected static double getMedian(double[] data, int length) { + return QuickSelect.select(data, data.length / 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java new file mode 100644 index 0000000..fff8ce8 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/sketch/SketchSynopsis.java @@ -0,0 +1,83 @@ +package org.apache.hyracks.storage.am.statistics.sketch; + +//public class SketchSynopsis extends Synopsis implements IIndexBulkLoader { +// +// private final int levelNum; +// private final int fanoutLog; +// private final double epsilon; +// private final GroupCountSketch gcSketch; +// private final IBufferCache bufferCache; +// private final IFileMapProvider fileMapProvider; +// private final FileReference file; +// private final int[] keyFields; +// private final int fileId = -1; +// private final boolean isActivated = false; +// +// public SketchSynopsis(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, +// int[] keyFields, int domainSize, double delta, double epsilon, int fanOut) { +// super(bufferCache, fileMapProvider, file); +// this.keyFields = keyFields; +// this.fanoutLog = (int) (Math.log(fanOut) / Math.log(2.0)); +// this.levelNum = domainSize / fanoutLog; +// this.epsilon = epsilon; +// final int depth = (int) Math.ceil(Math.log(1 / delta)); +// final int width = (int) Math.ceil(1 / epsilon); +// gcSketch = new GroupCountSketch(this.levelNum + 1, depth, width, fanoutLog); +// } +// +// public void update(long item, double diff) { +// //translate position to coefficient +// item += 1 << (levelNum * fanoutLog); +// //transform update into wavelet domain +// long div = 1; +// for (int i = 0; i < levelNum; i++) { +// // Long coeffIdx = (long) ((1 << ((levelNum - i) * fanoutLog)) + item); +// item >>= (fanoutLog - 1); +// int sign = (item & 1) == 0 ? 1 : -1; +// item >>= 1; +// double normCoeff = WaveletCoefficient.getNormalizationCoefficient(levelNum * fanoutLog, +// (i + 1) * fanoutLog); +// div = (1 << ((i + 1) * fanoutLog)); +// +// gcSketch.update(item, diff * sign / (normCoeff * div)); +// } +// gcSketch.update(0, diff / div); +// } +// +// @Override +// public void add(ITupleReference tuple) throws IndexException, HyracksDataException { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public void end() throws IndexException, HyracksDataException { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public void create() { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public void activate() { +// // TODO Auto-generated method stub +// +// } +// +// @Override +// public IIndexBulkLoader createBuilder(long numElements) { +// // TODO Auto-generated method stub +// return null; +// } +// +// @Override +// public void addElement(Object key, Object value) { +// // TODO Auto-generated method stub +// +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java new file mode 100644 index 0000000..c3a6186 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletCoefficient.java @@ -0,0 +1,85 @@ +package org.apache.hyracks.storage.am.statistics.wavelet; + +import java.util.Map.Entry; +import java.util.Objects; + +import org.apache.hyracks.data.std.api.IComparable; +import org.apache.hyracks.data.std.api.INumeric; +import org.apache.hyracks.data.std.api.IPointable; + +public class WaveletCoefficient*/, V extends IPointable & IComparable /* extends IMath*/> + implements Entry, Comparable> { + + public V value; + public int level; + public K index; + + public WaveletCoefficient() { + } + + public WaveletCoefficient(V value, int level, K index) { + this.value = value; + this.level = level; + this.index = index; + } + + @Override + public K getKey() { + return index; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + this.value = value; + return this.value; + } + + public void setLevel(int level) { + this.level = level; + } + + public void setIndex(K index) { + this.index = index; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof WaveletCoefficient)) + return false; + @SuppressWarnings("unchecked") + WaveletCoefficient triple = (WaveletCoefficient) o; + return triple.value.equals(value) && triple.level == level && triple.index.equals(index); + } + + @Override + public int hashCode() { + return Objects.hash(value, level, index); + } + + public static Double getNormalizationCoefficient(int maxLevel, int level) { + return (1 << ((maxLevel - level) / 2)) * ((((maxLevel - level) % 2) == 0) ? 1 : Math.sqrt(2)); + } + + public static int getLevel(K coeffPointable, int maxLevel) { + long coeffIdx = coeffPointable.longValue(); + if (coeffIdx == 0) + return maxLevel; + int level = -1; + while (coeffIdx > 0) { + coeffIdx = coeffIdx >> 1; + level++; + } + return maxLevel - level; + } + + @Override + // default comparator based on coefficient value + public int compareTo(WaveletCoefficient o) { + return value.compareTo(o.getValue()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java ---------------------------------------------------------------------- diff --git a/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java new file mode 100644 index 0000000..cd92489 --- /dev/null +++ b/hyracks/hyrack-storage-am-statistics/storage.am.statistics/src/main/java/org/apache/hyracks/storage/am/statistics/wavelet/WaveletSynopsis.java @@ -0,0 +1,291 @@ +package org.apache.hyracks.storage.am.statistics.wavelet; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.PriorityQueue; +import java.util.Stack; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.data.std.api.IPointableFactory; +import org.apache.hyracks.data.std.api.NumericPointable; +import org.apache.hyracks.data.std.api.OrdinalPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer; +import org.apache.hyracks.storage.am.common.api.IIndexBulkLoader; +import org.apache.hyracks.storage.am.common.api.IndexException; +import org.apache.hyracks.storage.am.statistics.common.Synopsis; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.file.BufferedFileHandle; +import org.apache.hyracks.storage.common.file.IFileMapProvider; + +public class WaveletSynopsis + /*, V extends IMath>*/ extends Synopsis { + + private final static int METADATA_PAGE_ID = 0; + private final static int NUM_PAGES_OFFSET = 0; + private final static int NUM_ELEMENTS_OFFSET = NUM_PAGES_OFFSET + 4; + + private final int[] waveletFields; + //private final IPrimitiveIntegerValueProvider waveletFieldValueProvider; + private final IPointableFactory keyPointableFactory; + private final IPointableFactory valuePointableFactory; + + private final PriorityQueue> coefficients; + private final long threshold; + + private final int numPages; + + public WaveletSynopsis(IBufferCache bufferCache, IFileMapProvider fileMapProvider, FileReference file, + int[] keyFields, int threshold, IPointableFactory keyPointableFactory, + IPointableFactory valuePointableFactory) { + super(bufferCache, fileMapProvider, file); + this.waveletFields = keyFields; + this.keyPointableFactory = keyPointableFactory; + this.valuePointableFactory = valuePointableFactory; + this.threshold = threshold; + this.coefficients = new PriorityQueue<>(threshold); + this.numPages = (int) Math.ceil(threshold * (4 + 8) / (double) bufferCache.getPageSize()); + } + + @Override + public int getNumPages() throws HyracksDataException { + if (!isActivated) { + throw new HyracksDataException("The synopsis is not activated."); + } + return numPages; + } + + private void initWaveletSynopsisMetaData() throws HyracksDataException { + ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), true); + metaPage.acquireWriteLatch(); + try { + metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, 0); + metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, 0L); + } finally { + metaPage.releaseWriteLatch(true); + bufferCache.unpin(metaPage); + } + } + + @Override + // Adds a new coefficient to the transform, subject to thresholding + public void addElement(OrdinalPointable index, NumericPointable value) { + WaveletCoefficient newCoeff; + if (coefficients.size() < threshold) + newCoeff = new WaveletCoefficient(value, 0, index); + else { + newCoeff = coefficients.poll(); + newCoeff.setValue(value); + newCoeff.setIndex(index); + } + coefficients.add(newCoeff); + + } + + @Override + public IIndexBulkLoader createBuilder() throws HyracksDataException { + return new SparseTransformBuilder(); + } + + public class SparseTransformBuilder implements IIndexBulkLoader { + private final Stack> avgStack; + private final Pair curr = Pair.of(keyPointableFactory.createPointable(), + valuePointableFactory.createPointable()); + private long transformPos; + private final long domainEnd; + private int lastLevel; + private final int maxLevel; + private final List> borderTuples; + private final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); + + public SparseTransformBuilder() throws HyracksDataException { + avgStack = new Stack<>(); + borderTuples = new ArrayList<>(2); + // set initial transform position to minimal value for key doimain + transformPos = curr.getKey().minDomainValue(); + domainEnd = curr.getKey().maxDomainValue(); + + DoubleSerializerDeserializer.INSTANCE.serialize(1.0, abvs.getDataOutput()); + curr.getRight().set(abvs.getByteArray(), bytesWritten, abvs.getLength() - bytesWritten); + bytesWritten = abvs.getLength(); + + maxLevel = curr.getKey().maxLevel(); + lastLevel = 0; + + persistWaveletSynopsisMetaData(); + } + + private void persistWaveletSynopsisMetaData() throws HyracksDataException { + ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, METADATA_PAGE_ID), false); + metaPage.acquireWriteLatch(); + try { + metaPage.getBuffer().putInt(NUM_PAGES_OFFSET, numPages); + metaPage.getBuffer().putLong(NUM_ELEMENTS_OFFSET, threshold); + } finally { + metaPage.releaseWriteLatch(true); + bufferCache.unpin(metaPage); + } + } + + // method computes number of levels (log base 2 of the distance) between two points + private int computeLogDistance(long x, long y) { + return (int) Math.floor(Math.log(x - y + 1) / Math.log(2)); + } + + @Override + public void add(ITupleReference tuple) throws IndexException, HyracksDataException { + // if (waveletFields.length > 1) + // throw new HyracksDataException("Wavelet synopsis does not support composite keys"); + curr.getKey().set(tuple.getFieldData(waveletFields[0]), tuple.getFieldStart(waveletFields[0]), + tuple.getFieldLength(waveletFields[0])); + + while (transformPos.compareTo(curr.getKey()) <= 0) { + // current position is a left border of dyadic range + if (curr.getKey().longValue() == transformPos) { + borderTuples.add(Pair.of(0, (double) curr.getValue())); + return; + } + int newLevel = computeLogDistance(curr.getKey().longValue(), transformPos); + long levelRightBorder = 1l << newLevel; + //add first dummy average + if (avgStack.isEmpty()) { + avgStack.push(new WaveletCoefficient(0.0, maxLevel, 0)); + lastLevel = newLevel; + } + + // current position is a right border of dyadic range + if (curr.getKey() == transformPos + levelRightBorder - 1 /*&& curr.position != domainMax*/) + borderTuples.add(Pair.of((int) (levelRightBorder - 1), (double) curr.getValue())); + + WaveletCoefficient newCoeff; + WaveletCoefficient topCoeff = avgStack.peek(); + if (newLevel >= lastLevel) { + topCoeff = computeDyadicRange(lastLevel, maxLevel, topCoeff, borderTuples); + newCoeff = topCoeff; + do { + WaveletCoefficient oldCoeff = avgStack.pop(); + //skip first dummy coefficient + if (oldCoeff.index > 0) + newCoeff = average(oldCoeff, newCoeff, waveletFieldValueProvider.maxLevel( + tuple.getFieldData(waveletFields[0]), tuple.getFieldStart(waveletFields[0]))); + } while (!avgStack.isEmpty() && avgStack.peek().level == newCoeff.level); + } else { + newCoeff = computeDyadicRange(newLevel, maxLevel, topCoeff, borderTuples); + topCoeff = newCoeff; + } + avgStack.push(newCoeff); + transformPos += 1l << topCoeff.level; + lastLevel = newCoeff.level; + + borderTuples.clear(); + } + } + + private WaveletCoefficient average( + WaveletCoefficient oldCoeff, + WaveletCoefficient newCoeff, int maxLevel) { + Integer coeffIdx = oldCoeff.index >> 1; + addElement(coeffIdx, (oldCoeff.value - newCoeff.value) + / (2.0 * WaveletCoefficient.getNormalizationCoefficient(maxLevel, oldCoeff.level + 1))); + WaveletCoefficient topCoeff = new WaveletCoefficient( + (oldCoeff.value + newCoeff.value) / 2.0, oldCoeff.level + 1, coeffIdx); + return topCoeff; + } + + private WaveletCoefficient computeDyadicRange(int level, int maxLevel, + WaveletCoefficient topCoeff, + List> borderTuples) { + //short circuit coefficient computation for 0 + if (borderTuples.isEmpty()) { + Integer coeffIdx = ((topCoeff.index + 1) << (topCoeff.level - level)); + return new WaveletCoefficient(0.0, level, coeffIdx); + } + + Map newCoefs = new HashMap<>(); + Double avg = 0.0; + Integer coeffIdx = -1; + for (int i = 1; i <= level; i++) { + newCoefs.clear(); + avg = 0.0; + for (int j = 0; j < borderTuples.size(); j++) { + Pair item = borderTuples.get(j); + coeffIdx = ((topCoeff.index + 1) << (topCoeff.level - i)) + (item.getKey() >> 1); + Double newValue = item.getValue() / 2.0; + Double oldValue = newCoefs.containsKey(coeffIdx) ? newCoefs.get(coeffIdx) : 0; + if ((item.getKey() & 0x1) == 1) { + newCoefs.put(coeffIdx, (oldValue - newValue)); + } else { + newCoefs.put(coeffIdx, (oldValue + newValue)); + } + avg += newValue; + borderTuples.set(j, Pair.of(item.getKey() >> 1, newValue)); + } + + for (Entry e : newCoefs.entrySet()) + addElement(e.getKey(), e.getValue() / WaveletCoefficient.getNormalizationCoefficient(maxLevel, i)); + } + return new WaveletCoefficient(avg, level, coeffIdx); + } + + @Override + public void end() throws IndexException, HyracksDataException { + //assert(avgStack.size() == 1); + if (curr.getKey() != domainEnd) { + + } + WaveletCoefficient topCoeff = avgStack.pop(); + topCoeff.index = 0; + addElement(topCoeff.index, topCoeff.value); + + persistStatistics(); + } + + private void persistStatistics() throws HyracksDataException { + List> persistCoefficients = new ArrayList<>( + coefficients); + // sorting coefficients according their keys + Collections.sort(persistCoefficients, + new Comparator>() { + @Override + public int compare(WaveletCoefficient o1, + WaveletCoefficient o2) { + return o1.getKey().compareTo(o2.getKey()); + } + }); + Iterator> it = persistCoefficients.iterator(); + int currentPageId = 1; + while (currentPageId <= numPages) { + ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), true); + ByteBuffer buffer = page.getBuffer(); + page.acquireWriteLatch(); + try { + while (it.hasNext() && (buffer.limit() - buffer.position()) >= 4 + 8) { + WaveletCoefficient coeff = it.next(); + buffer.put(coeff.getKey().getByteArray(), coeff.getKey().getStartOffset(), + coeff.getKey().getLength()); + buffer.put(coeff.getValue().getByteArray(), coeff.getValue().getStartOffset(), + coeff.getValue().getLength()); + } + } finally { + page.releaseWriteLatch(true); + bufferCache.unpin(page); + } + ++currentPageId; + } + + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IMath.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IMath.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IMath.java deleted file mode 100644 index b36f7e9..0000000 --- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IMath.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.hyracks.data.std.api; - -public interface IMath { - - public T and(Number mask); - - public T shiftRight(Number positions); - - public T shiftLeft(Number positions); - - public T add(Number summand); - - public T add(T pointableSummand); - - public T sub(Number subtrahend); - - public T sub(T pointableSubtrahend); - - public T div(Number subtrahend); - - public T div(T pointableSubtrahend); - - public T mult(Number subtrahend); - - public T mult(T pointableSubtrahend); -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/ByteSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/ByteSerializerDeserializer.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/ByteSerializerDeserializer.java new file mode 100644 index 0000000..eff6263 --- /dev/null +++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/ByteSerializerDeserializer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.common.data.marshalling; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ByteSerializerDeserializer implements ISerializerDeserializer { + private static final long serialVersionUID = 1L; + + public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer(); + + private ByteSerializerDeserializer() { + } + + @Override + public Byte deserialize(DataInput in) throws HyracksDataException { + try { + return in.readByte(); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } + + @Override + public void serialize(Byte instance, DataOutput out) throws HyracksDataException { + try { + out.writeByte(instance.intValue()); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/SerdeUtils.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/SerdeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/SerdeUtils.java index 99c438e..c6cc7fa 100644 --- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/SerdeUtils.java +++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/SerdeUtils.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.primitive.BooleanPointable; +import org.apache.hyracks.data.std.primitive.BytePointable; import org.apache.hyracks.data.std.primitive.DoublePointable; import org.apache.hyracks.data.std.primitive.FloatPointable; import org.apache.hyracks.data.std.primitive.IntegerPointable; @@ -32,6 +33,7 @@ import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.data.std.primitive.ShortPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.ByteSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer; @@ -78,6 +80,9 @@ public class SerdeUtils { } public static ITypeTraits serdeToTypeTrait(ISerializerDeserializer serde) { + if (serde instanceof ByteSerializerDeserializer) { + return BytePointable.TYPE_TRAITS; + } if (serde instanceof ShortSerializerDeserializer) { return ShortPointable.TYPE_TRAITS; } @@ -112,7 +117,8 @@ public class SerdeUtils { return f.createBinaryComparator(); } - public static IBinaryComparatorFactory[] serdesToComparatorFactories(ISerializerDeserializer[] serdes, int numSerdes) { + public static IBinaryComparatorFactory[] serdesToComparatorFactories(ISerializerDeserializer[] serdes, + int numSerdes) { IBinaryComparatorFactory[] comparatorsFactories = new IBinaryComparatorFactory[numSerdes]; for (int i = 0; i < numSerdes; i++) { comparatorsFactories[i] = serdeToComparatorFactory(serdes[i]); @@ -121,6 +127,9 @@ public class SerdeUtils { } public static IBinaryComparatorFactory serdeToComparatorFactory(ISerializerDeserializer serde) { + if (serde instanceof ByteSerializerDeserializer) { + return PointableBinaryComparatorFactory.of(BytePointable.FACTORY); + } if (serde instanceof ShortSerializerDeserializer) { return PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/e05a1921/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java ---------------------------------------------------------------------- diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java index cbee47e..4cf6bf4 100644 --- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java +++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java @@ -48,7 +48,7 @@ public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper { return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, - DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true, null, null, null, null, true, false, null); + DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true, null, null, null, null, true, false, null, null); } }