kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mahong...@apache.org
Subject [2/3] incubator-kylin git commit: staging
Date Tue, 25 Aug 2015 10:41:02 GMT
staging


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/747075a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/747075a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/747075a0

Branch: refs/heads/KYLIN-740
Commit: 747075a0c7ccba3ae2a4bd25bfa61183024c46a6
Parents: caf3020
Author: honma <honma@ebay.com>
Authored: Tue Aug 25 14:26:24 2015 +0800
Committer: honma <honma@ebay.com>
Committed: Tue Aug 25 14:26:24 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |    4 +
 .../kylin/storage/StorageQueryFactory.java      |    2 +-
 .../hbase/cube/v1/CubeSegmentTupleIterator.java |    9 +-
 .../storage/hbase/cube/v1/CubeStorageQuery.java |   36 +-
 .../hbase/cube/v1/filter/FuzzyRowFilterV2.java  |  597 ++++++++
 .../hbase/cube/v1/filter/UnsafeAccess.java      |  433 ++++++
 .../v1/filter/generated/FilterProtosExt.java    | 1333 ++++++++++++++++++
 .../cube/v1/filter/protobuf/FilterExt.proto     |   21 +
 .../v1/filter/TestFuzzyRowFilterEndToEnd.java   |  352 +++++
 .../cube/v1/filter/TestFuzzyRowFilterV2.java    |  247 ++++
 10 files changed, 3018 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/747075a0/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index cf2d3fc..6566a13 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -65,6 +65,10 @@ public class BasicTest {
     @Test
     public void testxx() {
         byte[] temp = new byte[] { 1, 2, 3 };
+        byte[] temp2 = new byte[] { 1, 2, 3 };
+
+        System.out.println(temp.hashCode());
+        System.out.println(temp2.hashCode());
 
         ByteBuffer buffer = ByteBuffer.allocateDirect(3);
         buffer.put((byte) 1);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/747075a0/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
index ec0554b..eb6e6b1 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
 public class StorageQueryFactory {
 
     private final static boolean allowStorageLayerCache = true;
-    private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery";
+    private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
     private final static String defaultIIStorageQuery = "org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery";
 
     public static IStorageQuery createQuery(IRealization realization) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/747075a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 6d94206..346b92f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.kylin.common.persistence.StorageException;
@@ -44,9 +43,9 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.translate.HBaseKeyRange;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
+import org.apache.kylin.storage.translate.HBaseKeyRange;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
@@ -217,11 +216,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
             Filter filter = scan.getFilter();
             if (filter != null) {
-                // may have existed InclusiveStopFilter, see buildScan
-                FilterList filterList = new FilterList();
-                filterList.addFilter(filter);
-                filterList.addFilter(rowFilter);
-                scan.setFilter(filterList);
+                throw new RuntimeException("Scan filter not empty : " + filter);
             } else {
                 scan.setFilter(rowFilter);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/747075a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 408b287..0514b45 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -29,6 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.util.Bytes;
@@ -56,12 +58,12 @@ import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 import org.apache.kylin.storage.translate.ColumnValueRange;
 import org.apache.kylin.storage.translate.DerivedFilterTranslator;
 import org.apache.kylin.storage.translate.HBaseKeyRange;
-import org.apache.kylin.storage.hbase.steps.HBaseConnection;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,9 +73,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
-/**
- * @author xjiang, yangli9
- */
+//v1
+@SuppressWarnings("unused")
 public class CubeStorageQuery implements ICachableStorageQuery {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
@@ -435,11 +436,19 @@ public class CubeStorageQuery implements ICachableStorageQuery {
                 scanRanges.add(rowKeyRange);
             }
 
+            //log
             sb.append(scanRanges.size() + "=>");
+
             List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
+
+            //log
             sb.append(mergedRanges.size() + "=>");
+
             mergedRanges = mergeTooManyRanges(mergedRanges);
+
+            //log
             sb.append(mergedRanges.size() + ", ");
+
             result.addAll(mergedRanges);
         }
 
@@ -592,7 +601,18 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             byte[] stopKey = keyRange.getStopKey();
             long partitionColumnStartDate = Long.MAX_VALUE;
             long partitionColumnEndDate = 0;
-            List<Pair<byte[], byte[]>> newFuzzyKeys = new ArrayList<Pair<byte[], byte[]>>(mergeSize);
+
+            TreeSet<Pair<byte[], byte[]>> newFuzzyKeys = new TreeSet<>(new Comparator<Pair<byte[], byte[]>>() {
+                @Override
+                public int compare(Pair<byte[], byte[]> o1, Pair<byte[], byte[]> o2) {
+                    int partialResult = Bytes.compareTo(o1.getFirst(), o2.getFirst());
+                    if (partialResult != 0) {
+                        return partialResult;
+                    } else {
+                        return Bytes.compareTo(o1.getSecond(), o2.getSecond());
+                    }
+                }
+            });
             List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
 
             boolean hasNonFuzzyRange = false;
@@ -619,7 +639,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
 
             partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
             partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
-            keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, newFuzzyKeys, newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
+            keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, Lists.newArrayList(newFuzzyKeys), newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
         }
         return keyRange;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/747075a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
new file mode 100644
index 0000000..0877058
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v1.filter;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kylin.storage.hbase.cube.v1.filter.generated.FilterProtosExt;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+
+
+/**
+ * This is optimized version of a standard FuzzyRowFilter Filters data based on fuzzy row key.
+ * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy info) to match row keys.
+ * Where fuzzy info is a byte array with 0 or 1 as its values:
+ * <ul>
+ * <li>0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position
+ * must match</li>
+ * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this
+ * position can be different from the one in provided row key</li>
+ * </ul>
+ * Example: Let's assume row key format is userId_actionId_year_month. Length of userId is fixed and
+ * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. Let's
+ * assume that we need to fetch all users that performed certain action (encoded as "99") in Jan of
+ * any year. Then the pair (row key, fuzzy info) would be the following: row key = "????_99_????_01"
+ * (one can use any value instead of "?") fuzzy info =
+ * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy info tells the matching
+ * mask is "????_99_????_01", where at ? can be any value.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FuzzyRowFilterV2 extends FilterBase {
+    private List<Pair<byte[], byte[]>> fuzzyKeysData;
+    private boolean done = false;
+
+    /**
+     * The index of a last successfully found matching fuzzy string (in fuzzyKeysData). We will start
+     * matching next KV with this one. If they do not match then we will return back to the one-by-one
+     * iteration over fuzzyKeysData.
+     */
+    private int lastFoundIndex = -1;
+
+    /**
+     * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was returned)
+     */
+    private RowTracker tracker;
+
+    public FuzzyRowFilterV2(List<Pair<byte[], byte[]>> fuzzyKeysData) {
+        Pair<byte[], byte[]> p;
+        for (int i = 0; i < fuzzyKeysData.size(); i++) {
+            p = fuzzyKeysData.get(i);
+            if (p.getFirst().length != p.getSecond().length) {
+                Pair<String, String> readable =
+                        new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p
+                                .getSecond()));
+                throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
+            }
+            // update mask ( 0 -> -1 (0xff), 1 -> 0)
+            p.setSecond(preprocessMask(p.getSecond()));
+            preprocessSearchKey(p);
+        }
+        this.fuzzyKeysData = fuzzyKeysData;
+        this.tracker = new RowTracker();
+    }
+
+    private void preprocessSearchKey(Pair<byte[], byte[]> p) {
+        if (UnsafeAccess.isAvailable() == false) {
+            // do nothing
+            return;
+        }
+        byte[] key = p.getFirst();
+        byte[] mask = p.getSecond();
+        for (int i = 0; i < mask.length; i++) {
+            // set non-fixed part of a search key to 0.
+            if (mask[i] == 0) key[i] = 0;
+        }
+    }
+
+    /**
+     * We need to preprocess mask array, as since we treat 0's as unfixed positions and -1 (0xff) as
+     * fixed positions
+     * @param mask
+     * @return mask array
+     */
+    private byte[] preprocessMask(byte[] mask) {
+        if (UnsafeAccess.isAvailable() == false) {
+            // do nothing
+            return mask;
+        }
+        if (isPreprocessedMask(mask)) return mask;
+        for (int i = 0; i < mask.length; i++) {
+            if (mask[i] == 0) {
+                mask[i] = -1; // 0 -> -1
+            } else if (mask[i] == 1) {
+                mask[i] = 0;// 1 -> 0
+            }
+        }
+        return mask;
+    }
+
+    private boolean isPreprocessedMask(byte[] mask) {
+        for (int i = 0; i < mask.length; i++) {
+            if (mask[i] != -1 && mask[i] != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public ReturnCode filterKeyValue(Cell c) {
+        final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0;
+        final int size = fuzzyKeysData.size();
+        for (int i = startIndex; i < size + startIndex; i++) {
+            final int index = i % size;
+            Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index);
+            SatisfiesCode satisfiesCode =
+                    satisfies(isReversed(), c.getRowArray(), c.getRowOffset(), c.getRowLength(),
+                            fuzzyData.getFirst(), fuzzyData.getSecond());
+            if (satisfiesCode == SatisfiesCode.YES) {
+                lastFoundIndex = index;
+                return ReturnCode.INCLUDE;
+            }
+        }
+        // NOT FOUND -> seek next using hint
+        lastFoundIndex = -1;
+        return ReturnCode.SEEK_NEXT_USING_HINT;
+
+    }
+
+    @Override
+    public Cell getNextCellHint(Cell currentCell) {
+        boolean result = tracker.updateTracker(currentCell);
+        if (result == false) {
+            done = true;
+            return null;
+        }
+        byte[] nextRowKey = tracker.nextRow();
+        return KeyValue.createFirstOnRow(nextRowKey);
+    }
+
+    /**
+     * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates
+     * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority
+     * queue so that the smallest row key always appears at queue head, which helps to decide the
+     * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will
+     * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any
+     * more.
+     */
+    private class RowTracker {
+        private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows;
+        private boolean initialized = false;
+
+        RowTracker() {
+            nextRows =
+                    new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(),
+                            new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() {
+                                @Override
+                                public int compare(Pair<byte[], Pair<byte[], byte[]>> o1,
+                                        Pair<byte[], Pair<byte[], byte[]>> o2) {
+                                    int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst());
+                                    if (!isReversed()) {
+                                        return compare;
+                                    } else {
+                                        return -compare;
+                                    }
+                                }
+                            });
+        }
+
+        byte[] nextRow() {
+            if (nextRows.isEmpty()) {
+                throw new IllegalStateException(
+                        "NextRows should not be empty, make sure to call nextRow() after updateTracker() return true");
+            } else {
+                return nextRows.peek().getFirst();
+            }
+        }
+
+        boolean updateTracker(Cell currentCell) {
+            if (!initialized) {
+                for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+                    updateWith(currentCell, fuzzyData);
+                }
+                initialized = true;
+            } else {
+                while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) {
+                    Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
+                    Pair<byte[], byte[]> fuzzyData = head.getSecond();
+                    updateWith(currentCell, fuzzyData);
+                }
+            }
+            return !nextRows.isEmpty();
+        }
+
+        boolean lessThan(Cell currentCell, byte[] nextRowKey) {
+            int compareResult =
+                    Bytes.compareTo(nextRowKey, 0, nextRowKey.length, currentCell.getRowArray(),
+                            currentCell.getRowOffset(), currentCell.getRowLength());
+            return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0);
+        }
+
+        void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
+            byte[] nextRowKeyCandidate =
+                    getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(),
+                            currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
+            if (nextRowKeyCandidate != null) {
+                nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
+            }
+        }
+
+    }
+
+    @Override
+    public boolean filterAllRemaining() {
+        return done;
+    }
+
+    /**
+     * @return The filter serialized using pb
+     */
+    public byte[] toByteArray() {
+        FilterProtosExt.FuzzyRowFilterV2.Builder builder = FilterProtosExt.FuzzyRowFilterV2.newBuilder();
+        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+            FilterProtosExt.BytesBytesPair.Builder bbpBuilder = FilterProtosExt.BytesBytesPair.newBuilder();
+            bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst()));
+            bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond()));
+            builder.addFuzzyKeysData(bbpBuilder);
+        }
+        return builder.build().toByteArray();
+    }
+
+    public static FuzzyRowFilterV2 parseFrom(final byte[] pbBytes) throws DeserializationException {
+        FilterProtosExt.FuzzyRowFilterV2 proto;
+        try {
+            proto = FilterProtosExt.FuzzyRowFilterV2.parseFrom(pbBytes);
+        } catch (InvalidProtocolBufferException e) {
+            throw new DeserializationException(e);
+        }
+        int count = proto.getFuzzyKeysDataCount();
+        ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new ArrayList<Pair<byte[], byte[]>>(count);
+        for (int i = 0; i < count; ++i) {
+            FilterProtosExt.BytesBytesPair current = proto.getFuzzyKeysData(i);
+            byte[] keyBytes = current.getFirst().toByteArray();
+            byte[] keyMeta = current.getSecond().toByteArray();
+            fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
+        }
+        return new FuzzyRowFilterV2(fuzzyKeysData);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("FuzzyRowFilter");
+        sb.append("{fuzzyKeysData=");
+        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+            sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":");
+            sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}');
+        }
+        sb.append("}, ");
+        return sb.toString();
+    }
+
+    // Utility methods
+
+    static enum SatisfiesCode {
+        /** row satisfies fuzzy rule */
+        YES,
+        /** row doesn't satisfy fuzzy rule, but there's possible greater row that does */
+        NEXT_EXISTS,
+        /** row doesn't satisfy fuzzy rule and there's no greater row that does */
+        NO_NEXT
+    }
+
+    @VisibleForTesting
+    static SatisfiesCode satisfies(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        return satisfies(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    @VisibleForTesting
+    static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+            byte[] fuzzyKeyMeta) {
+        return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, int length,
+            byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+
+        if (UnsafeAccess.isAvailable() == false) {
+            return satisfiesNoUnsafe(reverse, row, offset, length, fuzzyKeyBytes, fuzzyKeyMeta);
+        }
+
+        if (row == null) {
+            // do nothing, let scan to proceed
+            return SatisfiesCode.YES;
+        }
+        length = Math.min(length, fuzzyKeyBytes.length);
+        int numWords = length / Bytes.SIZEOF_LONG;
+
+        int j = numWords << 3; // numWords * SIZEOF_LONG;
+
+        for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
+            long fuzzyBytes = UnsafeAccess.toLong(fuzzyKeyBytes, i);
+            long fuzzyMeta = UnsafeAccess.toLong(fuzzyKeyMeta, i);
+            long rowValue = UnsafeAccess.toLong(row, offset + i);
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+        }
+
+        int off = j;
+
+        if (length - off >= Bytes.SIZEOF_INT) {
+            int fuzzyBytes = UnsafeAccess.toInt(fuzzyKeyBytes, off);
+            int fuzzyMeta = UnsafeAccess.toInt(fuzzyKeyMeta, off);
+            int rowValue = UnsafeAccess.toInt(row, offset + off);
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+            off += Bytes.SIZEOF_INT;
+        }
+
+        if (length - off >= Bytes.SIZEOF_SHORT) {
+            short fuzzyBytes = UnsafeAccess.toShort(fuzzyKeyBytes, off);
+            short fuzzyMeta = UnsafeAccess.toShort(fuzzyKeyMeta, off);
+            short rowValue = UnsafeAccess.toShort(row, offset + off);
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                // even if it does not (in this case getNextForFuzzyRule
+                // will return null)
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+            off += Bytes.SIZEOF_SHORT;
+        }
+
+        if (length - off >= Bytes.SIZEOF_BYTE) {
+            int fuzzyBytes = fuzzyKeyBytes[off] & 0xff;
+            int fuzzyMeta = fuzzyKeyMeta[off] & 0xff;
+            int rowValue = row[offset + off] & 0xff;
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+        }
+        return SatisfiesCode.YES;
+    }
+
+    static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length,
+            byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        if (row == null) {
+            // do nothing, let scan to proceed
+            return SatisfiesCode.YES;
+        }
+
+        Order order = Order.orderFor(reverse);
+        boolean nextRowKeyCandidateExists = false;
+
+        for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) {
+            // First, checking if this position is fixed and not equals the given one
+            boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0;
+            boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset];
+            if (fixedByteIncorrect) {
+                // in this case there's another row that satisfies fuzzy rule and bigger than this row
+                if (nextRowKeyCandidateExists) {
+                    return SatisfiesCode.NEXT_EXISTS;
+                }
+
+                // If this row byte is less than fixed then there's a byte array bigger than
+                // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array:
+                // this row is simply bigger than any byte array that satisfies the fuzzy rule
+                boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF);
+                if (rowByteLessThanFixed && !reverse) {
+                    return SatisfiesCode.NEXT_EXISTS;
+                } else if (!rowByteLessThanFixed && reverse) {
+                    return SatisfiesCode.NEXT_EXISTS;
+                } else {
+                    return SatisfiesCode.NO_NEXT;
+                }
+            }
+
+            // Second, checking if this position is not fixed and byte value is not the biggest. In this
+            // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get
+            // bigger byte array that satisfies the rule we need to just increase this byte
+            // (see the code of getNextForFuzzyRule below) by one.
+            // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's
+            // bigger one that satisfies the rule as it can't be increased.
+            if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) {
+                nextRowKeyCandidateExists = true;
+            }
+        }
+        return SatisfiesCode.YES;
+    }
+
+    @VisibleForTesting
+    static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    @VisibleForTesting
+    static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+            byte[] fuzzyKeyMeta) {
+        return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    /** Abstracts directional comparisons based on scan direction. */
+    private enum Order {
+        ASC {
+            public boolean lt(int lhs, int rhs) {
+                return lhs < rhs;
+            }
+
+            public boolean gt(int lhs, int rhs) {
+                return lhs > rhs;
+            }
+
+            public byte inc(byte val) {
+                // TODO: what about over/underflow?
+                return (byte) (val + 1);
+            }
+
+            public boolean isMax(byte val) {
+                return val == (byte) 0xff;
+            }
+
+            public byte min() {
+                return 0;
+            }
+        },
+        DESC {
+            public boolean lt(int lhs, int rhs) {
+                return lhs > rhs;
+            }
+
+            public boolean gt(int lhs, int rhs) {
+                return lhs < rhs;
+            }
+
+            public byte inc(byte val) {
+                // TODO: what about over/underflow?
+                return (byte) (val - 1);
+            }
+
+            public boolean isMax(byte val) {
+                return val == 0;
+            }
+
+            public byte min() {
+                return (byte) 0xFF;
+            }
+        };
+
+        public static Order orderFor(boolean reverse) {
+            return reverse ? DESC : ASC;
+        }
+
+        /** Returns true when {@code lhs < rhs}. */
+        public abstract boolean lt(int lhs, int rhs);
+
+        /** Returns true when {@code lhs > rhs}. */
+        public abstract boolean gt(int lhs, int rhs);
+
+        /** Returns {@code val} incremented by 1. */
+        public abstract byte inc(byte val);
+
+        /** Return true when {@code val} is the maximum value */
+        public abstract boolean isMax(byte val);
+
+        /** Return the minimum value according to this ordering scheme. */
+        public abstract byte min();
+    }
+
+    /**
+     * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, null
+     *         otherwise
+     */
+    @VisibleForTesting
+    static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, int length,
+            byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than
+        // the given one we do the following:
+        // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes
+        // 2. if during the first step given row did not increase, then we increase the value at
+        // the first "non-fixed" position (where it is not maximum already)
+
+        // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position
+        // values than otherwise.
+        byte[] result =
+                Arrays.copyOf(fuzzyKeyBytes, length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length);
+        if (reverse && length > fuzzyKeyBytes.length) {
+            // we need trailing 0xff's instead of trailing 0x00's
+            for (int i = fuzzyKeyBytes.length; i < result.length; i++) {
+                result[i] = (byte) 0xFF;
+            }
+        }
+        int toInc = -1;
+        final Order order = Order.orderFor(reverse);
+
+        boolean increased = false;
+        for (int i = 0; i < result.length; i++) {
+            if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+                result[i] = row[offset + i];
+                if (!order.isMax(row[offset + i])) {
+                    // this is "non-fixed" position and is not at max value, hence we can increase it
+                    toInc = i;
+                }
+            } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* fixed */) {
+                if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+                    // if setting value for any fixed position increased the original array,
+                    // we are OK
+                    increased = true;
+                    break;
+                }
+
+                if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+                    // if setting value for any fixed position makes array "smaller", then just stop:
+                    // in case we found some non-fixed position to increase we will do it, otherwise
+                    // there's no "next" row key that satisfies fuzzy rule and "greater" than given row
+                    break;
+                }
+            }
+        }
+
+        if (!increased) {
+            if (toInc < 0) {
+                return null;
+            }
+            result[toInc] = order.inc(result[toInc]);
+
+            // Setting all "non-fixed" positions to zeroes to the right of the one we increased so
+            // that found "next" row key is the smallest possible
+            for (int i = toInc + 1; i < result.length; i++) {
+                if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+                    result[i] = order.min();
+                }
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * @return true if and only if the fields of the filter that are serialized are equal to the
+     *         corresponding fields in other. Used for testing.
+     */
+    boolean areSerializedFieldsEqual(Filter o) {
+        if (o == this) return true;
+        if (!(o instanceof FuzzyRowFilterV2)) return false;
+
+        FuzzyRowFilterV2 other = (FuzzyRowFilterV2) o;
+        if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size()) return false;
+        for (int i = 0; i < fuzzyKeysData.size(); ++i) {
+            Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i);
+            Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i);
+            if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && Bytes.equals(
+                    thisData.getSecond(), otherData.getSecond()))) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/747075a0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java
new file mode 100644
index 0000000..34328ef
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v1.filter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class UnsafeAccess {
+
+    private static final Log LOG = LogFactory.getLog(UnsafeAccess.class);
+
+    static final Unsafe theUnsafe;
+
+    /** The offset to the first element in a byte array. */
+    static final long BYTE_ARRAY_BASE_OFFSET;
+
+    static final boolean littleEndian = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+    static {
+        theUnsafe = (Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+                try {
+                    Field f = Unsafe.class.getDeclaredField("theUnsafe");
+                    f.setAccessible(true);
+                    return f.get(null);
+                } catch (Throwable e) {
+                    LOG.warn("sun.misc.Unsafe is not accessible", e);
+                }
+                return null;
+            }
+        });
+
+        if (theUnsafe != null) {
+            BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
+        } else {
+            BYTE_ARRAY_BASE_OFFSET = -1;
+        }
+    }
+
+    private UnsafeAccess() {
+    }
+
+    /**
+     * @return true when the running JVM is having sun's Unsafe package available in it.
+     */
+    public static boolean isAvailable() {
+        return theUnsafe != null;
+    }
+
+    // APIs to read primitive data from a byte[] using Unsafe way
+    /**
+     * Converts a byte array to a short value considering it was written in big-endian format.
+     * @param bytes byte array
+     * @param offset offset into array
+     * @return the short value
+     */
+    public static short toShort(byte[] bytes, int offset) {
+        if (littleEndian) {
+            return Short.reverseBytes(theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
+        } else {
+            return theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
+        }
+    }
+
+    /**
+     * Converts a byte array to an int value considering it was written in big-endian format.
+     * @param bytes byte array
+     * @param offset offset into array
+     * @return the int value
+     */
+    public static int toInt(byte[] bytes, int offset) {
+        if (littleEndian) {
+            return Integer.reverseBytes(theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
+        } else {
+            return theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
+        }
+    }
+
+    /**
+     * Converts a byte array to a long value considering it was written in big-endian format.
+     * @param bytes byte array
+     * @param offset offset into array
+     * @return the long value
+     */
+    public static long toLong(byte[] bytes, int offset) {
+        if (littleEndian) {
+            return Long.reverseBytes(theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
+        } else {
+            return theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
+        }
+    }
+
+    // APIs to write primitive data to a byte[] using Unsafe way
+    /**
+     * Put a short value out to the specified byte array position in big-endian format.
+     * @param bytes the byte array
+     * @param offset position in the array
+     * @param val short to write out
+     * @return incremented offset
+     */
+    public static int putShort(byte[] bytes, int offset, short val) {
+        if (littleEndian) {
+            val = Short.reverseBytes(val);
+        }
+        theUnsafe.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
+        return offset + Bytes.SIZEOF_SHORT;
+    }
+
+    /**
+     * Put an int value out to the specified byte array position in big-endian format.
+     * @param bytes the byte array
+     * @param offset position in the array
+     * @param val int to write out
+     * @return incremented offset
+     */
+    public static int putInt(byte[] bytes, int offset, int val) {
+        if (littleEndian) {
+            val = Integer.reverseBytes(val);
+        }
+        theUnsafe.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
+        return offset + Bytes.SIZEOF_INT;
+    }
+
+    /**
+     * Put a long value out to the specified byte array position in big-endian format.
+     * @param bytes the byte array
+     * @param offset position in the array
+     * @param val long to write out
+     * @return incremented offset
+     */
+    public static int putLong(byte[] bytes, int offset, long val) {
+        if (littleEndian) {
+            val = Long.reverseBytes(val);
+        }
+        theUnsafe.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
+        return offset + Bytes.SIZEOF_LONG;
+    }
+
+    // APIs to read primitive data from a ByteBuffer using Unsafe way
+    /**
+     * Reads a short value at the given buffer's offset considering it was written in big-endian
+     * format.
+     *
+     * @param buf
+     * @param offset
+     * @return short value at offset
+     */
+    public static short toShort(ByteBuffer buf, int offset) {
+        if (littleEndian) {
+            return Short.reverseBytes(getAsShort(buf, offset));
+        }
+        return getAsShort(buf, offset);
+    }
+
+    /**
+     * Reads bytes at the given offset as a short value.
+     * @param buf
+     * @param offset
+     * @return short value at offset
+     */
+    static short getAsShort(ByteBuffer buf, int offset) {
+        if (buf.isDirect()) {
+            return theUnsafe.getShort(((DirectBuffer) buf).address() + offset);
+        }
+        return theUnsafe.getShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+    }
+
+    /**
+     * Reads an int value at the given buffer's offset considering it was written in big-endian
+     * format.
+     *
+     * @param buf
+     * @param offset
+     * @return int value at offset
+     */
+    public static int toInt(ByteBuffer buf, int offset) {
+        if (littleEndian) {
+            return Integer.reverseBytes(getAsInt(buf, offset));
+        }
+        return getAsInt(buf, offset);
+    }
+
+    /**
+     * Reads bytes at the given offset as an int value.
+     * @param buf
+     * @param offset
+     * @return int value at offset
+     */
+    static int getAsInt(ByteBuffer buf, int offset) {
+        if (buf.isDirect()) {
+            return theUnsafe.getInt(((DirectBuffer) buf).address() + offset);
+        }
+        return theUnsafe.getInt(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+    }
+
+    /**
+     * Reads a long value at the given buffer's offset considering it was written in big-endian
+     * format.
+     *
+     * @param buf
+     * @param offset
+     * @return long value at offset
+     */
+    public static long toLong(ByteBuffer buf, int offset) {
+        if (littleEndian) {
+            return Long.reverseBytes(getAsLong(buf, offset));
+        }
+        return getAsLong(buf, offset);
+    }
+
+    /**
+     * Reads bytes at the given offset as a long value.
+     * @param buf
+     * @param offset
+     * @return long value at offset
+     */
+    static long getAsLong(ByteBuffer buf, int offset) {
+        if (buf.isDirect()) {
+            return theUnsafe.getLong(((DirectBuffer) buf).address() + offset);
+        }
+        return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+    }
+
+    /**
+     * Put an int value out to the specified ByteBuffer offset in big-endian format.
+     * @param buf the ByteBuffer to write to
+     * @param offset offset in the ByteBuffer
+     * @param val int to write out
+     * @return incremented offset
+     */
+    public static int putInt(ByteBuffer buf, int offset, int val) {
+        if (littleEndian) {
+            val = Integer.reverseBytes(val);
+        }
+        if (buf.isDirect()) {
+            theUnsafe.putInt(((DirectBuffer) buf).address() + offset, val);
+        } else {
+            theUnsafe.putInt(buf.array(), offset + buf.arrayOffset() + BYTE_ARRAY_BASE_OFFSET, val);
+        }
+        return offset + Bytes.SIZEOF_INT;
+    }
+
+    // APIs to copy data. This will be direct memory location copy and will be much faster
+    /**
+     * Copies the bytes from given array's offset to length part into the given buffer.
+     * @param src
+     * @param srcOffset
+     * @param dest
+     * @param destOffset
+     * @param length
+     */
+    public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int destOffset, int length) {
+        long destAddress = destOffset;
+        Object destBase = null;
+        if (dest.isDirect()) {
+            destAddress = destAddress + ((DirectBuffer) dest).address();
+        } else {
+            destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset();
+            destBase = dest.array();
+        }
+        long srcAddress = srcOffset + BYTE_ARRAY_BASE_OFFSET;
+        theUnsafe.copyMemory(src, srcAddress, destBase, destAddress, length);
+    }
+
+    /**
+     * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the
+     * {@code dest} array.
+     *
+     * @param src
+     * @param srcOffset
+     * @param dest
+     * @param destOffset
+     * @param length
+     */
+    public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset, int length) {
+        long srcAddress = srcOffset;
+        Object srcBase = null;
+        if (src.isDirect()) {
+            srcAddress = srcAddress + ((DirectBuffer) src).address();
+        } else {
+            srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + src.arrayOffset();
+            srcBase = src.array();
+        }
+        long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET;
+        theUnsafe.copyMemory(srcBase, srcAddress, dest, destAddress, length);
+    }
+
+    /**
+     * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest}
+     * buffer.
+     *
+     * @param src
+     * @param srcOffset
+     * @param dest
+     * @param destOffset
+     * @param length
+     */
+    public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset, int length) {
+        long srcAddress, destAddress;
+        Object srcBase = null, destBase = null;
+        if (src.isDirect()) {
+            srcAddress = srcOffset + ((DirectBuffer) src).address();
+        } else {
+            srcAddress = srcOffset + src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET;
+            srcBase = src.array();
+        }
+        if (dest.isDirect()) {
+            destAddress = destOffset + ((DirectBuffer) dest).address();
+        } else {
+            destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset();
+            destBase = dest.array();
+        }
+        theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, length);
+    }
+
+    // APIs to add primitives to BBs
+    /**
+     * Put a short value out to the specified BB position in big-endian format.
+     * @param buf the byte buffer
+     * @param offset position in the buffer
+     * @param val short to write out
+     * @return incremented offset
+     */
+    public static int putShort(ByteBuffer buf, int offset, short val) {
+        if (littleEndian) {
+            val = Short.reverseBytes(val);
+        }
+        if (buf.isDirect()) {
+            theUnsafe.putShort(((DirectBuffer) buf).address() + offset, val);
+        } else {
+            theUnsafe.putShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
+        }
+        return offset + Bytes.SIZEOF_SHORT;
+    }
+
+    /**
+     * Put a long value out to the specified BB position in big-endian format.
+     * @param buf the byte buffer
+     * @param offset position in the buffer
+     * @param val long to write out
+     * @return incremented offset
+     */
+    public static int putLong(ByteBuffer buf, int offset, long val) {
+        if (littleEndian) {
+            val = Long.reverseBytes(val);
+        }
+        if (buf.isDirect()) {
+            theUnsafe.putLong(((DirectBuffer) buf).address() + offset, val);
+        } else {
+            theUnsafe.putLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
+        }
+        return offset + Bytes.SIZEOF_LONG;
+    }
+
+    /**
+     * Put a byte value out to the specified BB position in big-endian format.
+     * @param buf the byte buffer
+     * @param offset position in the buffer
+     * @param b byte to write out
+     * @return incremented offset
+     */
+    public static int putByte(ByteBuffer buf, int offset, byte b) {
+        if (buf.isDirect()) {
+            theUnsafe.putByte(((DirectBuffer) buf).address() + offset, b);
+        } else {
+            theUnsafe.putByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, b);
+        }
+        return offset + 1;
+    }
+
+    /**
+     * Returns the byte at the given offset
+     * @param buf the buffer to read
+     * @param offset the offset at which the byte has to be read
+     * @return the byte at the given offset
+     */
+    public static byte toByte(ByteBuffer buf, int offset) {
+        if (buf.isDirect()) {
+            return theUnsafe.getByte(((DirectBuffer) buf).address() + offset);
+        } else {
+            return theUnsafe.getByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+        }
+    }
+}


Mime
View raw message