Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 39780200C55 for ; Thu, 13 Apr 2017 13:21:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 36DCF160B98; Thu, 13 Apr 2017 11:21:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DC48E160BC1 for ; Thu, 13 Apr 2017 13:21:30 +0200 (CEST) Received: (qmail 53477 invoked by uid 500); 13 Apr 2017 11:21:30 -0000 Mailing-List: contact commits-help@kylin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.apache.org Delivered-To: mailing list commits@kylin.apache.org Received: (qmail 52129 invoked by uid 99); 13 Apr 2017 11:21:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Apr 2017 11:21:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 530BCE0061; Thu, 13 Apr 2017 11:21:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: kangkaisen@apache.org To: commits@kylin.apache.org Date: Thu, 13 Apr 2017 11:22:13 -0000 Message-Id: <93f62a2531fa49e88af1d4a3c5b37e4f@git.apache.org> In-Reply-To: <197879d0b91b4e4c86873e21aaf47932@git.apache.org> References: <197879d0b91b4e4c86873e21aaf47932@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] [abbrv] kylin git commit: KYLIN-2506 Refactor Global Dictionary archived-at: Thu, 13 Apr 2017 11:21:33 -0000 http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java new file mode 100644 index 0000000..dd9593a --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.LinkedList; + +public class DictNode { + public byte[] part; + public int id = -1; + public boolean isEndOfValue; + public ArrayList children = new ArrayList<>(); + + public int nValuesBeneath; + public DictNode parent; + public int childrenCount = 1; + + DictNode(byte[] value, boolean isEndOfValue) { + reset(value, isEndOfValue); + } + + DictNode(byte[] value, boolean isEndOfValue, ArrayList children) { + reset(value, isEndOfValue, children); + } + + void reset(byte[] value, boolean isEndOfValue) { + reset(value, isEndOfValue, new ArrayList()); + } + + void reset(byte[] value, boolean isEndOfValue, ArrayList children) { + this.part = value; + this.isEndOfValue = isEndOfValue; + clearChild(); + for (DictNode child : children) { + addChild(child); + } + this.id = -1; + } + + void clearChild() { + this.children.clear(); + int childrenCountDelta = this.childrenCount - 1; + for (DictNode p = this; p != null; p = p.parent) { + p.childrenCount -= childrenCountDelta; + } + } + + void addChild(DictNode child) { + addChild(-1, child); + } + + void addChild(int index, DictNode child) { + child.parent = this; + if (index < 0) { + this.children.add(child); + } else { + this.children.add(index, child); + } + for (DictNode p = this; p != null; p = p.parent) { + p.childrenCount += child.childrenCount; + } + } + + private DictNode removeChild(int index) { + DictNode child = children.remove(index); + child.parent = null; + for (DictNode p = this; p != null; p = p.parent) { + p.childrenCount -= child.childrenCount; + } + return child; + } + + private DictNode duplicateNode() { + DictNode newChild = new DictNode(part, false); + newChild.parent = parent; + if (parent != null) { + int index = parent.children.indexOf(this); + parent.addChild(index + 1, newChild); + } + return newChild; + } + + public byte[] firstValue() { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DictNode p = this; + while (true) { + bytes.write(p.part, 0, p.part.length); + if (p.isEndOfValue || p.children.size() == 0) { + break; + } + p = p.children.get(0); + } + return bytes.toByteArray(); + } + + public static DictNode splitNodeTree(final DictNode splitNode) { + if (splitNode == null) { + return null; + } + DictNode current = splitNode; + DictNode p = current.parent; + while (p != null) { + int index = p.children.indexOf(current); + assert index != -1; + DictNode newParent = p.duplicateNode(); + for (int i = p.children.size() - 1; i >= index; i--) { + DictNode child = p.removeChild(i); + newParent.addChild(0, child); + } + current = newParent; + p = p.parent; + } + return current; + } + + public byte[] buildTrieBytes() { + Stats stats = Stats.stats(this); + int sizeChildOffset = stats.mbpn_sizeChildOffset; + int sizeId = stats.mbpn_sizeId; + + // write head + byte[] head; + try { + ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); + DataOutputStream headOut = new DataOutputStream(byteBuf); + headOut.write(AppendTrieDictionary.HEAD_MAGIC); + headOut.writeShort(0); // head size, will back fill + headOut.writeInt(stats.mbpn_footprint); // body size + headOut.writeInt(stats.nValues); + headOut.write(sizeChildOffset); + headOut.write(sizeId); + headOut.close(); + head = byteBuf.toByteArray(); + BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2); + } catch (IOException e) { + throw new RuntimeException(e); // shall not happen, as we are + } + + byte[] trieBytes = new byte[stats.mbpn_footprint + head.length]; + System.arraycopy(head, 0, trieBytes, 0, head.length); + + LinkedList open = new LinkedList(); + IdentityHashMap offsetMap = new IdentityHashMap(); + + // write body + int o = head.length; + offsetMap.put(this, o); + o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes); + if (this.children.isEmpty() == false) + open.addLast(this); + + while (open.isEmpty() == false) { + DictNode parent = open.removeFirst(); + build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes); + for (int i = 0; i < parent.children.size(); i++) { + DictNode c = parent.children.get(i); + boolean isLastChild = (i == parent.children.size() - 1); + offsetMap.put(c, o); + o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes); + if (c.children.isEmpty() == false) + open.addLast(c); + } + } + + if (o != trieBytes.length) + throw new RuntimeException(); + return trieBytes; + } + + private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) { + int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE); + BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset); + trieBytes[parentOffset] |= flags; + } + + private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) { + int o = offset; + + // childOffset + if (isLastChild) + trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD; + if (n.isEndOfValue) + trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE; + o += sizeChildOffset; + + // nValueBytes + if (n.part.length > 255) + throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part)); + BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); + o++; + + // valueBytes + System.arraycopy(n.part, 0, trieBytes, o, n.part.length); + o += n.part.length; + + if (n.isEndOfValue) { + checkValidId(n.id); + BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId); + o += sizeId; + } + + return o; + } + + // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state + private void checkValidId(int id) { + if (id == 0 || id == -1) { + throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294"); + } + } + + @Override + public String toString() { + return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue())); + } + + static class Stats { + public interface Visitor { + void visit(DictNode n, int level); + } + + private static void traverseR(DictNode node, Visitor visitor, int level) { + visitor.visit(node, level); + for (DictNode c : node.children) + traverseR(c, visitor, level + 1); + } + + private static void traversePostOrderR(DictNode node, Visitor visitor, int level) { + for (DictNode c : node.children) + traversePostOrderR(c, visitor, level + 1); + visitor.visit(node, level); + } + + public int nValues; // number of values in total + public int nValueBytesPlain; // number of bytes for all values + // uncompressed + public int nValueBytesCompressed; // number of values bytes in Trie + // (compressed) + public int maxValueLength; // size of longest value in bytes + + // the trie is multi-byte-per-node + public int mbpn_nNodes; // number of nodes in trie + public int mbpn_trieDepth; // depth of trie + public int mbpn_maxFanOut; // the maximum no. children + public int mbpn_nChildLookups; // number of child lookups during lookup + // every value once + public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every + // value once + public int mbpn_sizeValueTotal; // the sum of value space in all nodes + public int mbpn_sizeNoValueBytes; // size of field noValueBytes + public int mbpn_sizeChildOffset; // size of field childOffset, points to + // first child in flattened array + public int mbpn_sizeId; // size of id value, always be 4 + public int mbpn_footprint; // MBPN footprint in bytes + + /** + * out print some statistics of the trie and the dictionary built from it + */ + public static Stats stats(DictNode root) { + // calculate nEndValueBeneath + traversePostOrderR(root, new Visitor() { + @Override + public void visit(DictNode n, int level) { + n.nValuesBeneath = n.isEndOfValue ? 1 : 0; + for (DictNode c : n.children) + n.nValuesBeneath += c.nValuesBeneath; + } + }, 0); + + // run stats + final Stats s = new Stats(); + final ArrayList lenAtLvl = new ArrayList(); + traverseR(root, new Visitor() { + @Override + public void visit(DictNode n, int level) { + if (n.isEndOfValue) + s.nValues++; + s.nValueBytesPlain += n.part.length * n.nValuesBeneath; + s.nValueBytesCompressed += n.part.length; + s.mbpn_nNodes++; + if (s.mbpn_trieDepth < level + 1) + s.mbpn_trieDepth = level + 1; + if (n.children.size() > 0) { + if (s.mbpn_maxFanOut < n.children.size()) + s.mbpn_maxFanOut = n.children.size(); + int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0); + s.mbpn_nChildLookups += childLookups; + s.mbpn_nTotalFanOut += childLookups * n.children.size(); + } + + if (level < lenAtLvl.size()) + lenAtLvl.set(level, n.part.length); + else + lenAtLvl.add(n.part.length); + int lenSoFar = 0; + for (int i = 0; i <= level; i++) + lenSoFar += lenAtLvl.get(i); + if (lenSoFar > s.maxValueLength) + s.maxValueLength = lenSoFar; + } + }, 0); + + // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode + s.mbpn_sizeId = 4; + s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId; + s.mbpn_sizeNoValueBytes = 1; + s.mbpn_sizeChildOffset = 5; + s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset); + while (true) { // minimize the offset size to match the footprint + int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1); + // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag + // expand t to long before *4, avoiding exceed Integer.MAX_VALUE + if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) { + s.mbpn_sizeChildOffset--; + s.mbpn_footprint = t; + } else + break; + } + + return s; + } + + /** + * out print trie for debug + */ + public void print(DictNode root) { + print(root, System.out); + } + + public void print(DictNode root, final PrintStream out) { + traverseR(root, new Visitor() { + @Override + public void visit(DictNode n, int level) { + try { + for (int i = 0; i < level; i++) + out.print(" "); + out.print(new String(n.part, "UTF-8")); + out.print(" - "); + if (n.nValuesBeneath > 0) + out.print(n.nValuesBeneath); + if (n.isEndOfValue) + out.print("* [" + n.id + "]"); + out.print("\n"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + }, 0); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java new file mode 100644 index 0000000..3ca0d8f --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +public class DictSlice { + static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict" + static final int HEAD_SIZE_I = HEAD_MAGIC.length; + static final int BIT_IS_LAST_CHILD = 0x80; + static final int BIT_IS_END_OF_VALUE = 0x40; + + private byte[] trieBytes; + + // non-persistent part + transient private int headSize; + transient private int bodyLen; + transient private int sizeChildOffset; + + transient private int nValues; + transient private int sizeOfId; + // mask MUST be long, since childOffset maybe 5 bytes at most + transient private long childOffsetMask; + transient private int firstByteOffset; + + public DictSlice(byte[] bytes) { + this.trieBytes = bytes; + init(); + } + + private void init() { + if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0) + throw new IllegalArgumentException("Wrong file type (magic does not match)"); + + try { + DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I)); + this.headSize = headIn.readShort(); + this.bodyLen = headIn.readInt(); + this.nValues = headIn.readInt(); + this.sizeChildOffset = headIn.read(); + this.sizeOfId = headIn.read(); + + this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8)); + this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte + } catch (Exception e) { + if (e instanceof RuntimeException) + throw (RuntimeException) e; + else + throw new RuntimeException(e); + } + } + + public static DictSlice deserializeFrom(DataInput in) throws IOException { + byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE]; + in.readFully(headPartial); + + if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0) + throw new IllegalArgumentException("Wrong file type (magic does not match)"); + + DataInputStream headIn = new DataInputStream(// + new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I)); + int headSize = headIn.readShort(); + int bodyLen = headIn.readInt(); + headIn.close(); + + byte[] all = new byte[headSize + bodyLen]; + System.arraycopy(headPartial, 0, all, 0, headPartial.length); + in.readFully(all, headPartial.length, all.length - headPartial.length); + + return new DictSlice(all); + } + + public byte[] getFirstValue() { + int nodeOffset = headSize; + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + while (true) { + int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1); + bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen); + if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) { + break; + } + nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask); + if (nodeOffset == headSize) { + break; + } + } + return bytes.toByteArray(); + } + + /** + * returns a code point from [0, nValues), preserving order of value + * + * @param n -- the offset of current node + * @param inp -- input value bytes to lookup + * @param o -- offset in the input value bytes matched so far + * @param inpEnd -- end of input + * @param roundingFlag -- =0: return -1 if not found + * -- <0: return closest smaller if not found, return -1 + * -- >0: return closest bigger if not found, return nValues + */ + private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) { + while (true) { + // match the current node + int p = n + firstByteOffset; // start of node's value + int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value + for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0] + if (trieBytes[p] != inp[o]) { + return -1; // mismatch + } + } + + // node completely matched, is input all consumed? + boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); + if (o == inpEnd) { + return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1; + } + + // find a child to continue + int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); + if (c == headSize) // has no children + return -1; + byte inpByte = inp[o]; + int comp; + while (true) { + p = c + firstByteOffset; + comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte); + if (comp == 0) { // continue in the matching child, reset n and loop again + n = c; + break; + } else if (comp < 0) { // try next child + if (checkFlag(c, BIT_IS_LAST_CHILD)) + return -1; + c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0); + } else { // children are ordered by their first value byte + return -1; + } + } + } + } + + private boolean checkFlag(int offset, int bit) { + return (trieBytes[offset] & bit) > 0; + } + + public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) { + int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag); + return id; + } + + public DictNode rebuildTrieTree() { + return rebuildTrieTreeR(headSize, null); + } + + private DictNode rebuildTrieTreeR(int n, DictNode parent) { + DictNode root = null; + while (true) { + int p = n + firstByteOffset; + int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); + int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); + boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); + + byte[] value = new byte[parLen]; + System.arraycopy(trieBytes, p, value, 0, parLen); + + DictNode node = new DictNode(value, isEndOfValue); + if (isEndOfValue) { + int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); + node.id = id; + } + + if (parent == null) { + root = node; + } else { + parent.addChild(node); + } + + if (childOffset != 0) { + rebuildTrieTreeR(childOffset + headSize, node); + } + + if (checkFlag(n, BIT_IS_LAST_CHILD)) { + break; + } else { + n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); + } + } + return root; + } + + public boolean doCheck() { + int offset = headSize; + HashSet parentSet = new HashSet<>(); + boolean lastChild = false; + + while (offset < trieBytes.length) { + if (lastChild) { + boolean contained = parentSet.remove(offset - headSize); + // Can't find parent, the data is corrupted + if (!contained) { + return false; + } + lastChild = false; + } + int p = offset + firstByteOffset; + int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask); + int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); + boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE); + + // Copy value overflow, the data is corrupted + if (trieBytes.length < p + parLen) { + return false; + } + + // Check id is fine + if (isEndOfValue) { + BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); + } + + // Record it if has children + if (childOffset != 0) { + parentSet.add(childOffset); + } + + // brothers done, move to next parent + if (checkFlag(offset, BIT_IS_LAST_CHILD)) { + lastChild = true; + } + + // move to next node + offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); + } + + // ParentMap is empty, meaning all nodes has parent, the data is correct + return parentSet.isEmpty(); + } + + @Override + public String toString() { + return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen); + } + + @Override + public int hashCode() { + return Arrays.hashCode(trieBytes); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DictSlice)) { + return false; + } + DictSlice that = (DictSlice) o; + return Arrays.equals(this.trieBytes, that.trieBytes); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java new file mode 100644 index 0000000..8fc3f78 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Bytes; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +public class DictSliceKey implements Comparable { + static final DictSliceKey START_KEY = DictSliceKey.wrap(new byte[0]); + + byte[] key; + + public static DictSliceKey wrap(byte[] key) { + DictSliceKey dictKey = new DictSliceKey(); + dictKey.key = key; + return dictKey; + } + + @Override + public String toString() { + return Bytes.toStringBinary(key); + } + + @Override + public int hashCode() { + return Arrays.hashCode(key); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof DictSliceKey) { + DictSliceKey that = (DictSliceKey) o; + return Arrays.equals(this.key, that.key); + } + return false; + } + + @Override + public int compareTo(DictSliceKey that) { + return Bytes.compareTo(key, that.key); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(key.length); + out.write(key); + } + + public void readFields(DataInput in) throws IOException { + key = new byte[in.readInt()]; + in.readFully(key); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java new file mode 100644 index 0000000..d9030d3 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +public class GlobalDictHDFSStore extends GlobalDictStore { + + static final Logger logger = LoggerFactory.getLogger(GlobalDictHDFSStore.class); + static final String V1_INDEX_NAME = ".index"; + static final String V2_INDEX_NAME = ".index_v2"; + static final String VERSION_PREFIX = "version_"; + static final int BUFFER_SIZE = 8 * 1024 * 1024; + + private final Path basePath; + private final Configuration conf; + private final FileSystem fileSystem; + + protected GlobalDictHDFSStore(String baseDir) throws IOException { + super(baseDir); + this.basePath = new Path(baseDir); + this.conf = HadoopUtil.getCurrentConfiguration(); + this.fileSystem = HadoopUtil.getFileSystem(baseDir); + + if (!fileSystem.exists(basePath)) { + logger.info("Global dict at {} doesn't exist, create a new one", basePath); + fileSystem.mkdirs(basePath); + } + + migrateOldLayout(); + } + + // Previously we put slice files and index file directly in base directory, + // should migrate to the new versioned layout + private void migrateOldLayout() throws IOException { + FileStatus[] sliceFiles = fileSystem.listStatus(basePath, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(IndexFormatV1.SLICE_PREFIX); + } + }); + Path indexFile = new Path(basePath, V1_INDEX_NAME); + + if (fileSystem.exists(indexFile) && sliceFiles.length > 0) { // old layout + final long version = System.currentTimeMillis(); + Path tempDir = new Path(basePath, "tmp_" + VERSION_PREFIX + version); + Path versionDir = getVersionDir(version); + + logger.info("Convert global dict at {} to new layout with version {}", basePath, version); + + fileSystem.mkdirs(tempDir); + // convert to new layout + try { + // copy index and slice files to temp + FileUtil.copy(fileSystem, indexFile, fileSystem, tempDir, false, conf); + for (FileStatus sliceFile : sliceFiles) { + FileUtil.copy(fileSystem, sliceFile.getPath(), fileSystem, tempDir, false, conf); + } + // rename + fileSystem.rename(tempDir, versionDir); + // delete index and slices files in base dir + fileSystem.delete(indexFile, false); + for (FileStatus sliceFile : sliceFiles) { + fileSystem.delete(sliceFile.getPath(), true); + } + + } finally { + if (fileSystem.exists(tempDir)) { + fileSystem.delete(tempDir, true); + } + } + } + } + + @Override + void prepareForWrite(String workingDir) throws IOException { + // TODO create lock file + Path working = new Path(workingDir); + + if (fileSystem.exists(working)) { + fileSystem.delete(working, true); + logger.info("Working directory {} exits, delete it first", working); + } + + // when build dict, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt + Long[] versions = listAllVersions(); + if (versions.length > 0) { + Path latestVersion = getVersionDir(versions[versions.length - 1]); + FileUtil.copy(fileSystem, latestVersion, fileSystem, working, false, true, conf); + } else { + fileSystem.mkdirs(working); + } + } + + @Override + Long[] listAllVersions() throws IOException { + FileStatus[] versionDirs = fileSystem.listStatus(basePath, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(VERSION_PREFIX); + } + }); + TreeSet versions = new TreeSet<>(); + for (int i = 0; i < versionDirs.length; i++) { + Path path = versionDirs[i].getPath(); + versions.add(Long.parseLong(path.getName().substring(VERSION_PREFIX.length()))); + } + return versions.toArray(new Long[versions.size()]); + } + + @Override + public Path getVersionDir(long version) { + return new Path(basePath, VERSION_PREFIX + version); + } + + @Override + GlobalDictMetadata getMetadata(long version) throws IOException { + Path versionDir = getVersionDir(version); + FileStatus[] indexFiles = fileSystem.listStatus(versionDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(V1_INDEX_NAME); + } + }); + checkState(indexFiles.length == 1, "zero or more than one index file found: %s", Arrays.toString(indexFiles)); + + IndexFormat format; + String indexFile = indexFiles[0].getPath().getName(); + if (V2_INDEX_NAME.equals(indexFile)) { + format = new IndexFormatV2(fileSystem, conf); + } else if (V1_INDEX_NAME.equals(indexFile)) { + format = new IndexFormatV1(fileSystem, conf); + } else { + throw new RuntimeException("Unknown index file: " + indexFile); + } + + return format.readIndexFile(versionDir); + } + + @Override + DictSlice readSlice(String directory, String sliceFileName) { + Path path = new Path(directory, sliceFileName); + logger.info("read slice from {}", path); + try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) { + return DictSlice.deserializeFrom(input); + } catch (IOException e) { + throw new RuntimeException(String.format("read slice %s failed", path), e); + } + } + + @Override + String writeSlice(String workingDir, DictSliceKey key, DictNode slice) { + //write new slice + String sliceFile = IndexFormatV2.sliceFileName(key); + Path path = new Path(workingDir, sliceFile); + + logger.info("write slice with key {} into file {}", key, path); + try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) { + byte[] bytes = slice.buildTrieBytes(); + out.write(bytes); + } catch (IOException e) { + throw new RuntimeException(String.format("write slice with key %s into file %s failed", key, path), e); + } + return sliceFile; + } + + @Override + void deleteSlice(String workingDir, String sliceFileName) { + Path path = new Path(workingDir, sliceFileName); + logger.info("delete slice at {}", path); + try { + if (fileSystem.exists(path)) { + fileSystem.delete(path, false); + } + } catch (IOException e) { + throw new RuntimeException(String.format("delete slice at %s failed", path), e); + } + } + + @Override + void commit(String workingDir, GlobalDictMetadata metadata) throws IOException { + Path workingPath = new Path(workingDir); + + // delete v1 index file + Path oldIndexFile = new Path(workingPath, V1_INDEX_NAME); + if (fileSystem.exists(oldIndexFile)) { + fileSystem.delete(oldIndexFile, false); + } + // write v2 index file + IndexFormat index = new IndexFormatV2(fileSystem, conf); + index.writeIndexFile(workingPath, metadata); + index.sanityCheck(workingPath, metadata); + + // copy working dir to newVersion dir + Path newVersionPath = new Path(basePath, VERSION_PREFIX + System.currentTimeMillis()); + fileSystem.rename(workingPath, newVersionPath); + + cleanUp(); + } + + // Check versions count, delete expired versions + private void cleanUp() throws IOException { + Long[] versions = listAllVersions(); + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < versions.length - maxVersions; i++) { + if (versions[i] + versionTTL < timestamp) { + fileSystem.delete(getVersionDir(versions[i]), true); + } + } + } + + @Override + String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { + checkArgument(baseDir.startsWith(srcConfig.getHdfsWorkingDirectory()), "Please check why current directory {} doesn't belong to source working directory {}", baseDir, srcConfig.getHdfsWorkingDirectory()); + + final String dstBaseDir = baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()); + + Long[] versions = listAllVersions(); + if (versions.length == 0) { // empty dict, nothing to copy + return dstBaseDir; + } + + Path srcVersionDir = getVersionDir(versions[versions.length - 1]); + Path dstVersionDir = new Path(srcVersionDir.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory())); + FileSystem dstFS = dstVersionDir.getFileSystem(conf); + if (dstFS.exists(dstVersionDir)) { + dstFS.delete(dstVersionDir, true); + } + FileUtil.copy(fileSystem, srcVersionDir, dstFS, dstVersionDir, false, true, conf); + + return dstBaseDir; + } + + public interface IndexFormat { + GlobalDictMetadata readIndexFile(Path dir) throws IOException; + + void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException; + + void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException; + } + + public static class IndexFormatV1 implements IndexFormat { + static final String SLICE_PREFIX = "cached_"; + + protected final FileSystem fs; + protected final Configuration conf; + + protected IndexFormatV1(FileSystem fs, Configuration conf) { + this.fs = fs; + this.conf = conf; + } + + @Override + public GlobalDictMetadata readIndexFile(Path dir) throws IOException { + Path indexFile = new Path(dir, V1_INDEX_NAME); + try (FSDataInputStream in = fs.open(indexFile)) { + int baseId = in.readInt(); + int maxId = in.readInt(); + int maxValueLength = in.readInt(); + int nValues = in.readInt(); + String converterName = in.readUTF(); + BytesConverter converter; + try { + converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e); + } + + int nSlices = in.readInt(); + TreeMap sliceFileMap = new TreeMap<>(); + for (int i = 0; i < nSlices; i++) { + DictSliceKey key = new DictSliceKey(); + key.readFields(in); + sliceFileMap.put(key, sliceFileName(key)); + } + // make sure first key is always "" + String firstFile = sliceFileMap.remove(sliceFileMap.firstKey()); + sliceFileMap.put(DictSliceKey.START_KEY, firstFile); + + return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap); + } + } + + //only for test + @Override + public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException { + Path indexFile = new Path(dir, V1_INDEX_NAME); + try (FSDataOutputStream out = fs.create(indexFile, true)) { + out.writeInt(metadata.baseId); + out.writeInt(metadata.maxId); + out.writeInt(metadata.maxValueLength); + out.writeInt(metadata.nValues); + out.writeUTF(metadata.bytesConverter.getClass().getName()); + out.writeInt(metadata.sliceFileMap.size()); + for (Map.Entry entry : metadata.sliceFileMap.entrySet()) { + entry.getKey().write(out); + } + } + } + + @Override + public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException { + throw new UnsupportedOperationException("sanityCheck V1 format is no longer supported"); + } + + public static String sliceFileName(DictSliceKey key) { + return SLICE_PREFIX + key; + } + } + + public static class IndexFormatV2 extends IndexFormatV1 { + static final String SLICE_PREFIX = "cached_"; + static final int MINOR_VERSION_V1 = 0x01; + + protected IndexFormatV2(FileSystem fs, Configuration conf) { + super(fs, conf); + } + + @Override + public GlobalDictMetadata readIndexFile(Path dir) throws IOException { + Path indexFile = new Path(dir, V2_INDEX_NAME); + try (FSDataInputStream in = fs.open(indexFile)) { + byte minorVersion = in.readByte(); // include a header to allow minor format changes + if (minorVersion != MINOR_VERSION_V1) { + throw new RuntimeException("Unsupported minor version " + minorVersion); + } + int baseId = in.readInt(); + int maxId = in.readInt(); + int maxValueLength = in.readInt(); + int nValues = in.readInt(); + String converterName = in.readUTF(); + BytesConverter converter; + try { + converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e); + } + + int nSlices = in.readInt(); + TreeMap sliceFileMap = new TreeMap<>(); + for (int i = 0; i < nSlices; i++) { + DictSliceKey key = new DictSliceKey(); + key.readFields(in); + String sliceFileName = in.readUTF(); + sliceFileMap.put(key, sliceFileName); + } + + return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap); + } + } + + @Override + public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException { + Path indexFile = new Path(dir, V2_INDEX_NAME); + try (FSDataOutputStream out = fs.create(indexFile, true)) { + out.writeByte(MINOR_VERSION_V1); + out.writeInt(metadata.baseId); + out.writeInt(metadata.maxId); + out.writeInt(metadata.maxValueLength); + out.writeInt(metadata.nValues); + out.writeUTF(metadata.bytesConverter.getClass().getName()); + out.writeInt(metadata.sliceFileMap.size()); + for (Map.Entry entry : metadata.sliceFileMap.entrySet()) { + entry.getKey().write(out); + out.writeUTF(entry.getValue()); + } + } + } + + @Override + public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException { + for (Map.Entry entry : metadata.sliceFileMap.entrySet()) { + if (!fs.exists(new Path(dir, entry.getValue()))) { + throw new RuntimeException("The slice file " + entry.getValue() + " for the key: " + entry.getKey() + " must be existed!"); + } + } + } + + public static String sliceFileName(DictSliceKey key) { + return String.format("%s%d_%d", SLICE_PREFIX, System.currentTimeMillis(), key.hashCode()); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java new file mode 100644 index 0000000..65c80ca --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import com.google.common.base.Preconditions; + +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * Encapsulates the metadata for a particular version of the global dictionary. + * Usually each version of a global dictionary stores its metadata in an index file. + */ +public class GlobalDictMetadata { + final int baseId; + final int maxId; + final int maxValueLength; + final int nValues; + final BytesConverter bytesConverter; + final TreeMap sliceFileMap; // slice key -> slice file name + + public GlobalDictMetadata(int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, NavigableMap sliceFileMap) { + + Preconditions.checkNotNull(bytesConverter, "bytesConverter"); + Preconditions.checkNotNull(sliceFileMap, "sliceFileMap"); + + this.baseId = baseId; + this.maxId = maxId; + this.maxValueLength = maxValueLength; + this.nValues = nValues; + this.bytesConverter = bytesConverter; + this.sliceFileMap = new TreeMap<>(sliceFileMap); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java new file mode 100644 index 0000000..5817868 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; + +import java.io.IOException; + +public abstract class GlobalDictStore { + + protected final String baseDir; // base directory containing all versions of this global dict + protected final int maxVersions; + protected final int versionTTL; + + protected GlobalDictStore(String baseDir) { + this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir"); + this.maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); + this.versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); + } + + // workingDir should be an absolute path, will create if not exists + abstract void prepareForWrite(String workingDir) throws IOException; + + /** + * @return all versions of this dictionary in ascending order + * @throws IOException on I/O error + */ + abstract Long[] listAllVersions() throws IOException; + + // return the path of specified version dir + abstract Path getVersionDir(long version); + + /** + * Get the metadata for a particular version of the dictionary. + * @param version version number + * @return GlobalDictMetadata for the specified version + * @throws IOException on I/O error + */ + abstract GlobalDictMetadata getMetadata(long version) throws IOException; + + /** + * Read a DictSlice from a slice file. + * @param workingDir directory of the slice file + * @param sliceFileName file name of the slice + * @return a DictSlice + * @throws IOException on I/O error + */ + abstract DictSlice readSlice(String workingDir, String sliceFileName); + + /** + * Write a slice with the given key to the specified directory. + * @param workingDir where to write the slice, should exist + * @param key slice key + * @param slice slice to write + * @return file name of the new written slice + * @throws IOException on I/O error + */ + abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice); + + /** + * Delete a slice with the specified file name. + * @param workingDir directory of the slice file, should exist + * @param sliceFileName file name of the slice, should exist + * @throws IOException on I/O error + */ + abstract void deleteSlice(String workingDir, String sliceFileName); + + /** + * commit the DictSlice and GlobalDictMetadata in workingDir to new versionDir + * @param workingDir where store the tmp slice and index, should exist + * @param globalDictMetadata the metadata of global dict + * @throws IOException on I/O error + */ + abstract void commit(String workingDir, GlobalDictMetadata globalDictMetadata) throws IOException; + + /** + * Copy the latest version of this dict to another meta. The source is unchanged. + * @param srcConfig config of source meta + * @param dstConfig config of destination meta + * @return the new base directory for destination meta + * @throws IOException on I/O error + */ + abstract String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index cda3c2b..7921980 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -20,6 +20,7 @@ package org.apache.kylin.dict; import java.io.IOException; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; /** @@ -28,7 +29,7 @@ import org.apache.kylin.common.util.Dictionary; * Created by sunyerui on 16/5/24. */ public class GlobalDictionaryBuilder implements IDictionaryBuilder { - AppendTrieDictionary.Builder builder; + AppendTrieDictionaryBuilder builder; int baseId; @Override @@ -36,19 +37,20 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { if (dictInfo == null) { throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo"); } - this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir()); + + int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); + this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice); this.baseId = baseId; } - + @Override public boolean addValue(String value) { if (value == null) return false; - builder.addValue(value); return true; } - + @Override public Dictionary build() throws IOException { return builder.build(baseId); http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java index e2af338..9da5071 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java @@ -18,6 +18,7 @@ package org.apache.kylin.dict; +import static org.apache.kylin.dict.GlobalDictHDFSStore.V2_INDEX_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -29,62 +30,76 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; +import java.io.FileFilter; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -/** - * Created by sunyerui on 16/4/28. - */ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { - public static final String BASE_DIR = "file:///tmp/kylin_append_dict"; - public static final String RESOURCE_DIR = "/dict/append_dict_test"; + private static final UUID uuid = UUID.randomUUID(); + private static final String RESOURCE_DIR = "/dict/append_dict_test/" + uuid; + private static final String HDFS_DIR = "file:///tmp/kylin_append_dict"; + private static String BASE_DIR; + private static String LOCAL_BASE_DIR = "/tmp/kylin_append_dict/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/"; @Before - public void setUp() { + public void beforeTest() { staticCreateTestMetadata(); - System.setProperty("kylin.dictionary.append-entry-size", "50000"); - System.setProperty("kylin.env.hdfs-working-dir", BASE_DIR); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000"); + KylinConfig.getInstanceFromEnv().setProperty("kylin.env.hdfs-working-dir", HDFS_DIR); + BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; } @After - public void after() { + public void afterTest() { cleanup(); staticCleanupTestMetadata(); } - public static void cleanup() { - Path basePath = new Path(BASE_DIR); + private void cleanup() { + Path basePath = new Path(HDFS_DIR); try { HadoopUtil.getFileSystem(basePath).delete(basePath, true); - } catch (IOException e) {} + } catch (IOException e) { + } } - public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "字典", "字典树", "字母", // non-ascii characters + private static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "字典", "字典树", "字母", // non-ascii characters "", // empty - "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", - "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", + "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" - + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", + + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", "paint", "tar", "try", // some dup }; + private AppendTrieDictionaryBuilder createBuilder(String resourceDir) throws IOException { + int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); + return new AppendTrieDictionaryBuilder(resourceDir, maxEntriesPerSlice); + } + @Test public void testStringRepeatly() throws IOException { ArrayList list = new ArrayList<>(); @@ -94,20 +109,22 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { notfound.add("pars"); notfound.add("tri"); notfound.add("字"); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 50; i++) { testStringDictAppend(list, notfound, true); + //to speed up the test + cleanup(); } } @Test - public void englishWordsTest() throws Exception { + public void testEnglishWords() throws Exception { InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt"); ArrayList str = loadStrings(is); testStringDictAppend(str, null, false); } @Test - public void categoryNamesTest() throws Exception { + public void testCategoryNames() throws Exception { InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat"); ArrayList str = loadStrings(is); testStringDictAppend(str, null, true); @@ -133,7 +150,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { @Ignore("need huge key set") @Test public void testHugeKeySet() throws IOException { - AppendTrieDictionary.Builder b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + AppendTrieDictionary dict = null; InputStream is = new FileInputStream("src/test/resources/dict/huge_key"); @@ -143,17 +161,17 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { while ((word = reader.readLine()) != null) { word = word.trim(); if (!word.isEmpty()) - b.addValue(word); + builder.addValue(word); } } finally { reader.close(); is.close(); } - dict = b.build(0); + dict = builder.build(0); dict.dump(System.out); } - private static void testStringDictAppend(ArrayList list, ArrayList notfound, boolean shuffleList) throws IOException { + private void testStringDictAppend(ArrayList list, ArrayList notfound, boolean shuffleList) throws IOException { Random rnd = new Random(System.currentTimeMillis()); ArrayList strList = new ArrayList(); strList.addAll(list); @@ -162,8 +180,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { } BytesConverter converter = new StringBytesConverter(); - AppendTrieDictionary.Builder b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); - AppendTrieDictionary dict = null; + AppendTrieDictionaryBuilder b = createBuilder(RESOURCE_DIR); + TreeMap checkMap = new TreeMap<>(); int firstAppend = rnd.nextInt(strList.size() / 2); int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2); @@ -173,7 +191,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { for (; appendIndex < firstAppend; appendIndex++) { b.addValue(strList.get(appendIndex)); } - dict = b.build(0); + AppendTrieDictionary dict = b.build(0); dict.dump(System.out); for (; checkIndex < firstAppend; checkIndex++) { String str = strList.get(checkIndex); @@ -185,13 +203,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { } // reopen dict and append -// b = AppendTrieDictionary.Builder.create(dict); - b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); + b = createBuilder(RESOURCE_DIR); + for (; appendIndex < secondAppend; appendIndex++) { b.addValue(strList.get(appendIndex)); } - AppendTrieDictionary newDict = b.build(0); - assert newDict == dict; + AppendTrieDictionary newDict = b.build(0); + assert newDict.equals(dict); dict = newDict; dict.dump(System.out); checkIndex = 0; @@ -210,12 +228,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { } // reopen dict and append rest str - b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); + b = createBuilder(RESOURCE_DIR); + for (; appendIndex < strList.size(); appendIndex++) { b.addValue(strList.get(appendIndex)); } newDict = b.build(0); - assert newDict == dict; + assert newDict.equals(dict); dict = newDict; dict.dump(System.out); checkIndex = 0; @@ -268,7 +287,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { @Test public void testMaxInteger() throws IOException { - AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); builder.setMaxId(Integer.MAX_VALUE - 2); builder.addValue("a"); builder.addValue("ab"); @@ -284,7 +303,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { @Ignore("Only occurred when value is very long (>8000 bytes)") @Test public void testSuperLongValue() throws IOException { - AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); String value = "a"; for (int i = 0; i < 10000; i++) { value += "a"; @@ -299,17 +318,15 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { dictionary.getMaxId(); } - private static class SharedBuilderThread extends Thread { + private class SharedBuilderThread extends Thread { CountDownLatch startLatch; CountDownLatch finishLatch; - String resourcePath; String prefix; int count; - SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) { + SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) { this.startLatch = startLatch; this.finishLatch = finishLatch; - this.resourcePath = resourcePath; this.prefix = prefix; this.count = count; } @@ -317,27 +334,28 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { @Override public void run() { try { - AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(resourcePath); + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); startLatch.countDown(); for (int i = 0; i < count; i++) { builder.addValue(prefix + i); } builder.build(0); finishLatch.countDown(); - } catch (IOException e) {} + } catch (IOException e) { + } } } + @Ignore @Test public void testSharedBuilder() throws IOException, InterruptedException { - String resourcePath = "shared_builder"; final CountDownLatch startLatch = new CountDownLatch(3); final CountDownLatch finishLatch = new CountDownLatch(3); - AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(resourcePath); - Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000); - Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10); - Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000); + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000); + Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10); + Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000); t1.start(); t2.start(); t3.start(); @@ -345,23 +363,228 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { AppendTrieDictionary dict = builder.build(0); assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS)); assertEquals(110010, dict.getMaxId()); - try { - builder.addValue("fail"); - fail("Builder should be closed"); - } catch (Exception e) {} - builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict); + builder = createBuilder(RESOURCE_DIR); builder.addValue("success"); + builder.addValue("s"); dict = builder.build(0); - for (int i = 0; i < 10000; i ++) { + for (int i = 0; i < 10000; i++) { assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); } - for (int i = 0; i < 10; i ++) { + for (int i = 0; i < 10; i++) { assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); } - for (int i = 0; i < 100000; i ++) { + for (int i = 0; i < 100000; i++) { assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); } assertEquals(110011, dict.getIdFromValue("success")); + assertEquals(110012, dict.getIdFromValue("s")); + } + + @Test + public void testSplitContainSuperLongValue() throws IOException { + String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; + + createAppendTrieDict(Arrays.asList("a", superLongValue)); + } + + @Test + public void testSuperLongValueAsFileName() throws IOException { + String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; + + createAppendTrieDict(Arrays.asList("a", superLongValue)); + } + + @Test + public void testIllegalFileNameValue() throws IOException { + createAppendTrieDict(Arrays.asList("::", ":")); + } + + @Test + public void testSkipAddValue() throws IOException { + createAppendTrieDict(new ArrayList()); + } + + private void createAppendTrieDict(List valueList) throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "1"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + + for (String value : valueList) { + builder.addValue(value); + } + + builder.build(0); + } + + private static class CachedFileFilter implements FileFilter { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith("cached_"); + } + } + + private static class VersionFilter implements FileFilter { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX); + } + } + + @Test + public void testMultiVersions() throws IOException, InterruptedException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + AppendTrieDictionary dict = builder.build(0); + + assertEquals(2, dict.getIdFromValue("b")); + + // re-open dict, append new data + builder = createBuilder(RESOURCE_DIR); + builder.addValue("g"); + + // new data is not visible + try { + dict.getIdFromValue("g"); + fail("Value 'g' (g) not exists!"); + } catch (IllegalArgumentException e) { + + } + + // append data, and be visible for new immutable map + builder.addValue("h"); + + AppendTrieDictionary newDict = builder.build(0); + assert newDict.equals(dict); + + assertEquals(7, newDict.getIdFromValue("g")); + assertEquals(8, newDict.getIdFromValue("h")); + + // Check versions retention + File dir = new File(LOCAL_BASE_DIR); + assertEquals(2, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testVersionRetention() throws IOException, InterruptedException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions", "1"); + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl", "1000"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + + //version 1 + builder.build(0); + + // Check versions retention + File dir = new File(LOCAL_BASE_DIR); + assertEquals(1, dir.listFiles(new VersionFilter()).length); + + // sleep to make version 1 expired + Thread.sleep(1200); + + //version 2 + builder = createBuilder(RESOURCE_DIR); + builder.addValue(""); + builder.build(0); + + // Check versions retention + assertEquals(1, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testOldDirFormat() throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + builder.build(0); + + convertDirToOldFormat(BASE_DIR); + + File dir = new File(LOCAL_BASE_DIR); + assertEquals(0, dir.listFiles(new VersionFilter()).length); + assertEquals(3, dir.listFiles(new CachedFileFilter()).length); + + //convert older format to new format when builder init + builder = createBuilder(RESOURCE_DIR); + builder.build(0); + + assertEquals(1, dir.listFiles(new VersionFilter()).length); + } + + private void convertDirToOldFormat(String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = HadoopUtil.getFileSystem(basePath); + + // move version dir to base dir, to simulate the older format + GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); + Long[] versions = store.listAllVersions(); + Path versionPath = store.getVersionDir(versions[versions.length - 1]); + Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName()); + fs.rename(versionPath, tmpVersionPath); + fs.delete(new Path(baseDir), true); + fs.rename(tmpVersionPath, new Path(baseDir)); + } + + @Test + public void testOldIndexFormat() throws IOException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4"); + + AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); + builder.addValue("a"); + builder.addValue("b"); + builder.addValue("c"); + builder.addValue("d"); + builder.addValue("e"); + builder.addValue("f"); + builder.build(0); + + convertIndexToOldFormat(BASE_DIR); + + builder = createBuilder(RESOURCE_DIR); + builder.addValue("g"); + builder.addValue("h"); + builder.addValue("i"); + AppendTrieDictionary dict = builder.build(0); + + assertEquals(1, dict.getIdFromValue("a")); + assertEquals(7, dict.getIdFromValue("g")); + } + + private void convertIndexToOldFormat(String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = HadoopUtil.getFileSystem(basePath); + + GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir); + Long[] versions = store.listAllVersions(); + GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]); + + //convert v2 index to v1 index + Path versionPath = store.getVersionDir(versions[versions.length - 1]); + Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME); + + fs.delete(v2IndexFile, true); + GlobalDictHDFSStore.IndexFormat indexFormatV1 = new GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration()); + indexFormatV1.writeIndexFile(versionPath, metadata); + + //convert v2 fileName format to v1 fileName format + for (Map.Entry entry : metadata.sliceFileMap.entrySet()) { + fs.rename(new Path(versionPath, entry.getValue()), new Path(versionPath, "cached_" + entry.getKey())); + } } + }