Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 53196925C for ; Fri, 12 Dec 2014 05:59:36 +0000 (UTC) Received: (qmail 11377 invoked by uid 500); 12 Dec 2014 05:59:36 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 11287 invoked by uid 500); 12 Dec 2014 05:59:36 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 10712 invoked by uid 99); 12 Dec 2014 05:59:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Dec 2014 05:59:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8A84AA2816C; Fri, 12 Dec 2014 05:59:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.apache.org Date: Fri, 12 Dec 2014 05:59:49 -0000 Message-Id: In-Reply-To: <0e8994e8365645b4ad5ae01348693af3@git.apache.org> References: <0e8994e8365645b4ad5ae01348693af3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java new file mode 100644 index 0000000..00112e7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Preconditions; +import com.google.protobuf.Message; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.*; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; +import java.io.OutputStream; + +@Deprecated +public class BinarySerializerDeserializer implements SerializerDeserializer { + + static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)}; + + @Override + public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) + throws IOException { + byte[] bytes; + int length = 0; + if (datum == null || datum instanceof NullDatum) { + return 0; + } + + switch (col.getDataType().getType()) { + case BOOLEAN: + case BIT: + case CHAR: + bytes = datum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case INT2: + length = writeShort(out, datum.asInt2()); + break; + case INT4: + length = writeVLong(out, datum.asInt4()); + break; + case INT8: + length = writeVLong(out, datum.asInt8()); + break; + case FLOAT4: + length = writeFloat(out, datum.asFloat4()); + break; + case FLOAT8: + length = writeDouble(out, datum.asFloat8()); + break; + case TEXT: { + bytes = datum.asTextBytes(); + length = datum.size(); + if (length == 0) { + bytes = INVALID_UTF__SINGLE_BYTE; + length = INVALID_UTF__SINGLE_BYTE.length; + } + out.write(bytes, 0, bytes.length); + break; + } + case BLOB: + case INET4: + case INET6: + bytes = datum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobufDatum = (ProtobufDatum) datum; + bytes = protobufDatum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case NULL_TYPE: + break; + default: + throw new IOException("Does not support type"); + } + return length; + } + + @Override + public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + if (length == 0) return NullDatum.get(); + + Datum datum; + switch (col.getDataType().getType()) { + case BOOLEAN: + datum = DatumFactory.createBool(bytes[offset]); + break; + case BIT: + datum = DatumFactory.createBit(bytes[offset]); + break; + case CHAR: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + datum = DatumFactory.createChar(chars); + break; + } + case INT2: + datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length)); + break; + case INT4: + datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset)); + break; + case INT8: + datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset)); + break; + case FLOAT4: + datum = DatumFactory.createFloat4(toFloat(bytes, offset, length)); + break; + case FLOAT8: + datum = DatumFactory.createFloat8(toDouble(bytes, offset, length)); + break; + case TEXT: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + + if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) { + datum = DatumFactory.createText(new byte[0]); + } else { + datum = DatumFactory.createText(chars); + } + break; + } + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode()); + Message.Builder builder = factory.newBuilder(); + builder.mergeFrom(bytes, offset, length); + datum = factory.createDatum(builder); + break; + } + case INET4: + datum = DatumFactory.createInet4(bytes, offset, length); + break; + case BLOB: + datum = DatumFactory.createBlob(bytes, offset, length); + break; + default: + datum = NullDatum.get(); + } + return datum; + } + + private byte[] shortBytes = new byte[2]; + + public int writeShort(OutputStream out, short val) throws IOException { + shortBytes[0] = (byte) (val >> 8); + shortBytes[1] = (byte) val; + out.write(shortBytes, 0, 2); + return 2; + } + + public float toFloat(byte[] bytes, int offset, int length) { + Preconditions.checkArgument(length == 4); + + int val = ((bytes[offset] & 0x000000FF) << 24) + + ((bytes[offset + 1] & 0x000000FF) << 16) + + ((bytes[offset + 2] & 0x000000FF) << 8) + + (bytes[offset + 3] & 0x000000FF); + return Float.intBitsToFloat(val); + } + + private byte[] floatBytes = new byte[4]; + + public int writeFloat(OutputStream out, float f) throws IOException { + int val = Float.floatToIntBits(f); + + floatBytes[0] = (byte) (val >> 24); + floatBytes[1] = (byte) (val >> 16); + floatBytes[2] = (byte) (val >> 8); + floatBytes[3] = (byte) val; + out.write(floatBytes, 0, 4); + return floatBytes.length; + } + + public double toDouble(byte[] bytes, int offset, int length) { + Preconditions.checkArgument(length == 8); + long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) + + ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) + + ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) + + ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) + + ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) + + ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) + + ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) + + (long) (bytes[offset + 7] & 0x00000000000000FF); + return Double.longBitsToDouble(val); + } + + private byte[] doubleBytes = new byte[8]; + + public int writeDouble(OutputStream out, double d) throws IOException { + long val = Double.doubleToLongBits(d); + + doubleBytes[0] = (byte) (val >> 56); + doubleBytes[1] = (byte) (val >> 48); + doubleBytes[2] = (byte) (val >> 40); + doubleBytes[3] = (byte) (val >> 32); + doubleBytes[4] = (byte) (val >> 24); + doubleBytes[5] = (byte) (val >> 16); + doubleBytes[6] = (byte) (val >> 8); + doubleBytes[7] = (byte) val; + out.write(doubleBytes, 0, 8); + return doubleBytes.length; + } + + private byte[] vLongBytes = new byte[9]; + + public static int writeVLongToByteArray(byte[] bytes, int offset, long l) { + if (l >= -112 && l <= 127) { + bytes[offset] = (byte) l; + return 1; + } + + int len = -112; + if (l < 0) { + l ^= -1L; // take one's complement' + len = -120; + } + + long tmp = l; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + bytes[offset++] = (byte) len; + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); + } + return 1 + len; + } + + public int writeVLong(OutputStream out, long l) throws IOException { + int len = writeVLongToByteArray(vLongBytes, 0, l); + out.write(vLongBytes, 0, len); + return len; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java new file mode 100644 index 0000000..85c79fa --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; +import org.apache.hadoop.classification.InterfaceStability; + +/* this class is PooledBuffer holder */ +public class BufferPool { + + private static final PooledByteBufAllocator allocator; + + private BufferPool() { + } + + static { + //TODO we need determine the default params + allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + + /* if you are finding memory leak, please enable this line */ + //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } + + public static long maxDirectMemory() { + return PlatformDependent.maxDirectMemory(); + } + + + public synchronized static ByteBuf directBuffer(int size) { + return allocator.directBuffer(size); + } + + /** + * + * @param size the initial capacity + * @param max the max capacity + * @return allocated ByteBuf from pool + */ + public static ByteBuf directBuffer(int size, int max) { + return allocator.directBuffer(size, max); + } + + @InterfaceStability.Unstable + public static void forceRelease(ByteBuf buf) { + buf.release(buf.refCnt()); + } + + /** + * the ByteBuf will increase to writable size + * @param buf + * @param minWritableBytes required minimum writable size + */ + public static void ensureWritable(ByteBuf buf, int minWritableBytes) { + buf.ensureWritable(minWritableBytes); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java new file mode 100644 index 0000000..45fb1d8 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; + +public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel { + + ByteBufferReadable byteBufferReadable; + ReadableByteChannel channel; + InputStream inputStream; + + public ByteBufInputChannel(InputStream inputStream) { + if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) { + this.byteBufferReadable = (ByteBufferReadable) inputStream; + } else { + this.channel = Channels.newChannel(inputStream); + } + + this.inputStream = inputStream; + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public long read(ByteBuffer[] dsts) { + return read(dsts, 0, dsts.length); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (byteBufferReadable != null) { + return byteBufferReadable.read(dst); + } else { + return channel.read(dst); + } + } + + @Override + protected void implCloseChannel() throws IOException { + IOUtils.cleanup(null, channel, inputStream); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java new file mode 100644 index 0000000..8841a31 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class DataLocation { + private String host; + private int volumeId; + + public DataLocation(String host, int volumeId) { + this.host = host; + this.volumeId = volumeId; + } + + public String getHost() { + return host; + } + + public int getVolumeId() { + return volumeId; + } + + @Override + public String toString() { + return "DataLocation{" + + "host=" + host + + ", volumeId=" + volumeId + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java new file mode 100644 index 0000000..2396349 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.util.ArrayList; +import java.util.List; + +public class DiskDeviceInfo { + private int id; + private String name; + + private List mountInfos = new ArrayList(); + + public DiskDeviceInfo(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return id + "," + name; + } + + public void addMountPath(DiskMountInfo diskMountInfo) { + mountInfos.add(diskMountInfo); + } + + public List getMountInfos() { + return mountInfos; + } + + public void setMountInfos(List mountInfos) { + this.mountInfos = mountInfos; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java new file mode 100644 index 0000000..22f18ba --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class DiskInfo { + private int id; + private String partitionName; + private String mountPath; + + private long capacity; + private long used; + + public DiskInfo(int id, String partitionName) { + this.id = id; + this.partitionName = partitionName; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getPartitionName() { + return partitionName; + } + + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; + } + + public String getMountPath() { + return mountPath; + } + + public void setMountPath(String mountPath) { + this.mountPath = mountPath; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java new file mode 100644 index 0000000..aadb0e7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; + +public class DiskMountInfo implements Comparable { + private String mountPath; + + private long capacity; + private long used; + + private int deviceId; + + public DiskMountInfo(int deviceId, String mountPath) { + this.mountPath = mountPath; + } + + public String getMountPath() { + return mountPath; + } + + public void setMountPath(String mountPath) { + this.mountPath = mountPath; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + + public int getDeviceId() { + return deviceId; + } + + @Override + public boolean equals(Object obj){ + if (!(obj instanceof DiskMountInfo)) return false; + + if (compareTo((DiskMountInfo) obj) == 0) return true; + else return false; + } + + @Override + public int hashCode(){ + return Objects.hashCode(mountPath); + } + + @Override + public int compareTo(DiskMountInfo other) { + String path1 = mountPath; + String path2 = other.mountPath; + + int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ; + int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ; + + if(path1Depth > path2Depth) { + return -1; + } else if(path1Depth < path2Depth) { + return 1; + } else { + int path1Length = path1.length(); + int path2Length = path2.length(); + + if(path1Length < path2Length) { + return 1; + } else if(path1Length > path2Length) { + return -1; + } else { + return path1.compareTo(path2); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java new file mode 100644 index 0000000..2d68870 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.common.Util; + +import java.io.*; +import java.net.URI; +import java.util.*; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + +public class DiskUtil { + + static String UNIX_DISK_DEVICE_PATH = "/proc/partitions"; + + public enum OSType { + OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC + } + + static private OSType getOSType() { + String osName = System.getProperty("os.name"); + if (osName.contains("Windows") + && (osName.contains("XP") || osName.contains("2003") + || osName.contains("Vista") + || osName.contains("Windows_7") + || osName.contains("Windows 7") || osName + .contains("Windows7"))) { + return OSType.OS_TYPE_WINXP; + } else if (osName.contains("SunOS") || osName.contains("Solaris")) { + return OSType.OS_TYPE_SOLARIS; + } else if (osName.contains("Mac")) { + return OSType.OS_TYPE_MAC; + } else { + return OSType.OS_TYPE_UNIX; + } + } + + public static List getDiskDeviceInfos() throws IOException { + List deviceInfos; + + if(getOSType() == OSType.OS_TYPE_UNIX) { + deviceInfos = getUnixDiskDeviceInfos(); + setDeviceMountInfo(deviceInfos); + } else { + deviceInfos = getDefaultDiskDeviceInfos(); + } + + return deviceInfos; + } + + private static List getUnixDiskDeviceInfos() { + List infos = new ArrayList(); + + File file = new File(UNIX_DISK_DEVICE_PATH); + if(!file.exists()) { + System.out.println("No partition file:" + file.getAbsolutePath()); + return getDefaultDiskDeviceInfos(); + } + + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH))); + String line = null; + + int count = 0; + Set deviceNames = new TreeSet(); + while((line = reader.readLine()) != null) { + if(count > 0 && !line.trim().isEmpty()) { + String[] tokens = line.trim().split(" +"); + if(tokens.length == 4) { + String deviceName = getDiskDeviceName(tokens[3]); + deviceNames.add(deviceName); + } + } + count++; + } + + int id = 0; + for(String eachDeviceName: deviceNames) { + DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++); + diskDeviceInfo.setName(eachDeviceName); + + //TODO set addtional info + // /sys/block/sda/queue + infos.add(diskDeviceInfo); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if(reader != null) { + try { + reader.close(); + } catch (IOException e) { + } + } + } + + return infos; + } + + private static String getDiskDeviceName(String partitionName) { + byte[] bytes = partitionName.getBytes(); + + byte[] result = new byte[bytes.length]; + int length = 0; + for(int i = 0; i < bytes.length; i++, length++) { + if(bytes[i] >= '0' && bytes[i] <= '9') { + break; + } else { + result[i] = bytes[i]; + } + } + + return new String(result, 0, length); + } + + public static List getDefaultDiskDeviceInfos() { + DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0); + diskDeviceInfo.setName("default"); + + List infos = new ArrayList(); + + infos.add(diskDeviceInfo); + + return infos; + } + + + private static void setDeviceMountInfo(List deviceInfos) throws IOException { + Map deviceMap = new HashMap(); + for(DiskDeviceInfo eachDevice: deviceInfos) { + deviceMap.put(eachDevice.getName(), eachDevice); + } + + BufferedReader mountOutput = null; + try { + Process mountProcess = Runtime.getRuntime().exec("mount"); + mountOutput = new BufferedReader(new InputStreamReader( + mountProcess.getInputStream())); + while (true) { + String line = mountOutput.readLine(); + if (line == null) { + break; + } + + int indexStart = line.indexOf(" on /"); + int indexEnd = line.indexOf(" ", indexStart + 4); + + String deviceName = line.substring(0, indexStart).trim(); + String[] deviceNameTokens = deviceName.split("/"); + if(deviceNameTokens.length == 3) { + if("dev".equals(deviceNameTokens[1])) { + String realDeviceName = getDiskDeviceName(deviceNameTokens[2]); + String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath(); + + DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName); + if(diskDeviceInfo != null) { + diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath)); + } + } + } + } + } catch (IOException e) { + throw e; + } finally { + if (mountOutput != null) { + mountOutput.close(); + } + } + } + + public static int getDataNodeStorageSize(){ + return getStorageDirs().size(); + } + + public static List getStorageDirs(){ + Configuration conf = new HdfsConfiguration(); + Collection dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); + return Util.stringCollectionAsURIs(dirNames); + } + + public static void main(String[] args) throws Exception { + System.out.println("/dev/sde1".split("/").length); + for(String eachToken: "/dev/sde1".split("/")) { + System.out.println(eachToken); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java new file mode 100644 index 0000000..8b7e2e0 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage; + +import com.google.common.base.Preconditions; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.UnsupportedException; + +/** + * An instance of FrameTuple is an immutable tuple. + * It contains two tuples and pretends to be one instance of Tuple for + * join qual evaluatations. + */ +public class FrameTuple implements Tuple, Cloneable { + private int size; + private int leftSize; + + private Tuple left; + private Tuple right; + + public FrameTuple() {} + + public FrameTuple(Tuple left, Tuple right) { + set(left, right); + } + + public void set(Tuple left, Tuple right) { + this.size = left.size() + right.size(); + this.left = left; + this.leftSize = left.size(); + this.right = right; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean contains(int fieldId) { + Preconditions.checkArgument(fieldId < size, + "Out of field access: " + fieldId); + + if (fieldId < leftSize) { + return left.contains(fieldId); + } else { + return right.contains(fieldId - leftSize); + } + } + + @Override + public boolean isNull(int fieldid) { + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); + } + + @Override + public void clear() { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException(); + } + + @Override + public void setOffset(long offset) { + throw new UnsupportedException(); + } + + @Override + public long getOffset() { + throw new UnsupportedException(); + } + + @Override + public void put(Datum [] values) { + throw new UnsupportedException(); + } + + @Override + public Datum get(int fieldId) { + Preconditions.checkArgument(fieldId < size, + "Out of field access: " + fieldId); + + if (fieldId < leftSize) { + return left.get(fieldId); + } else { + return right.get(fieldId - leftSize); + } + } + + @Override + public boolean getBool(int fieldId) { + return get(fieldId).asBool(); + } + + @Override + public byte getByte(int fieldId) { + return get(fieldId).asByte(); + } + + @Override + public char getChar(int fieldId) { + return get(fieldId).asChar(); + } + + @Override + public byte [] getBytes(int fieldId) { + return get(fieldId).asByteArray(); + } + + @Override + public short getInt2(int fieldId) { + return get(fieldId).asInt2(); + } + + @Override + public int getInt4(int fieldId) { + return get(fieldId).asInt4(); + } + + @Override + public long getInt8(int fieldId) { + return get(fieldId).asInt8(); + } + + @Override + public float getFloat4(int fieldId) { + return get(fieldId).asFloat4(); + } + + @Override + public double getFloat8(int fieldId) { + return get(fieldId).asFloat8(); + } + + @Override + public String getText(int fieldId) { + return get(fieldId).asChars(); + } + + @Override + public ProtobufDatum getProtobufDatum(int fieldId) { + return (ProtobufDatum) get(fieldId); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override + public char [] getUnicodeChars(int fieldId) { + return get(fieldId).asUnicodeChars(); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + FrameTuple frameTuple = (FrameTuple) super.clone(); + frameTuple.set(this.left.clone(), this.right.clone()); + return frameTuple; + } + + @Override + public Datum[] getValues(){ + throw new UnsupportedException(); + } + + public String toString() { + boolean first = true; + StringBuilder str = new StringBuilder(); + str.append("("); + for(int i=0; i < size(); i++) { + if(contains(i)) { + if(first) { + first = false; + } else { + str.append(", "); + } + str.append(i) + .append("=>") + .append(get(i)); + } + } + str.append(")"); + return str.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java new file mode 100644 index 0000000..bfbe478 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.UnsupportedException; + +import java.util.Arrays; + +public class LazyTuple implements Tuple, Cloneable { + private long offset; + private Datum[] values; + private byte[][] textBytes; + private Schema schema; + private byte[] nullBytes; + private SerializerDeserializer serializeDeserialize; + + public LazyTuple(Schema schema, byte[][] textBytes, long offset) { + this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer()); + } + + public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) { + this.schema = schema; + this.textBytes = textBytes; + this.values = new Datum[schema.size()]; + this.offset = offset; + this.nullBytes = nullBytes; + this.serializeDeserialize = serde; + } + + public LazyTuple(LazyTuple tuple) { + this.values = tuple.getValues(); + this.offset = tuple.offset; + this.schema = tuple.schema; + this.textBytes = new byte[size()][]; + this.nullBytes = tuple.nullBytes; + this.serializeDeserialize = tuple.serializeDeserialize; + } + + @Override + public int size() { + return values.length; + } + + @Override + public boolean contains(int fieldid) { + return textBytes[fieldid] != null || values[fieldid] != null; + } + + @Override + public boolean isNull(int fieldid) { + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); + } + + @Override + public void clear() { + for (int i = 0; i < values.length; i++) { + values[i] = null; + textBytes[i] = null; + } + } + + ////////////////////////////////////////////////////// + // Setter + ////////////////////////////////////////////////////// + @Override + public void put(int fieldId, Datum value) { + values[fieldId] = value; + textBytes[fieldId] = null; + } + + @Override + public void put(int fieldId, Datum[] values) { + for (int i = fieldId, j = 0; j < values.length; i++, j++) { + this.values[i] = values[j]; + } + this.textBytes = new byte[values.length][]; + } + + @Override + public void put(int fieldId, Tuple tuple) { + for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) { + values[i] = tuple.get(j); + textBytes[i] = null; + } + } + + @Override + public void put(Datum[] values) { + System.arraycopy(values, 0, this.values, 0, size()); + this.textBytes = new byte[values.length][]; + } + + ////////////////////////////////////////////////////// + // Getter + ////////////////////////////////////////////////////// + @Override + public Datum get(int fieldId) { + if (values[fieldId] != null) + return values[fieldId]; + else if (textBytes.length <= fieldId) { + values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,") + } else if (textBytes[fieldId] != null) { + try { + values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId), + textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes); + } catch (Exception e) { + values[fieldId] = NullDatum.get(); + } + textBytes[fieldId] = null; + } else { + //non-projection + } + return values[fieldId]; + } + + @Override + public void setOffset(long offset) { + this.offset = offset; + } + + @Override + public long getOffset() { + return this.offset; + } + + @Override + public boolean getBool(int fieldId) { + return get(fieldId).asBool(); + } + + @Override + public byte getByte(int fieldId) { + return get(fieldId).asByte(); + } + + @Override + public char getChar(int fieldId) { + return get(fieldId).asChar(); + } + + @Override + public byte [] getBytes(int fieldId) { + return get(fieldId).asByteArray(); + } + + @Override + public short getInt2(int fieldId) { + return get(fieldId).asInt2(); + } + + @Override + public int getInt4(int fieldId) { + return get(fieldId).asInt4(); + } + + @Override + public long getInt8(int fieldId) { + return get(fieldId).asInt8(); + } + + @Override + public float getFloat4(int fieldId) { + return get(fieldId).asFloat4(); + } + + @Override + public double getFloat8(int fieldId) { + return get(fieldId).asFloat8(); + } + + @Override + public String getText(int fieldId) { + return get(fieldId).asChars(); + } + + @Override + public ProtobufDatum getProtobufDatum(int fieldId) { + throw new UnsupportedException(); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + return get(fieldId).asUnicodeChars(); + } + + public String toString() { + boolean first = true; + StringBuilder str = new StringBuilder(); + str.append("("); + Datum d; + for (int i = 0; i < values.length; i++) { + d = get(i); + if (d != null) { + if (first) { + first = false; + } else { + str.append(", "); + } + str.append(i) + .append("=>") + .append(d); + } + } + str.append(")"); + return str.toString(); + } + + @Override + public int hashCode() { + return Arrays.hashCode(values); + } + + @Override + public Datum[] getValues() { + Datum[] datums = new Datum[values.length]; + for (int i = 0; i < values.length; i++) { + datums[i] = get(i); + } + return datums; + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + LazyTuple lazyTuple = (LazyTuple) super.clone(); + + lazyTuple.values = getValues(); //shallow copy + lazyTuple.textBytes = new byte[size()][]; + return lazyTuple; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Tuple) { + Tuple other = (Tuple) obj; + return Arrays.equals(getValues(), other.getValues()); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java new file mode 100644 index 0000000..f19b61f --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.datum.*; +import org.apache.tajo.util.ClassSize; + +public class MemoryUtil { + + /** Overhead for an NullDatum */ + public static final long NULL_DATUM; + + /** Overhead for an BoolDatum */ + public static final long BOOL_DATUM; + + /** Overhead for an CharDatum */ + public static final long CHAR_DATUM; + + /** Overhead for an BitDatum */ + public static final long BIT_DATUM; + + /** Overhead for an Int2Datum */ + public static final long INT2_DATUM; + + /** Overhead for an Int4Datum */ + public static final long INT4_DATUM; + + /** Overhead for an Int8Datum */ + public static final long INT8_DATUM; + + /** Overhead for an Float4Datum */ + public static final long FLOAT4_DATUM; + + /** Overhead for an Float8Datum */ + public static final long FLOAT8_DATUM; + + /** Overhead for an TextDatum */ + public static final long TEXT_DATUM; + + /** Overhead for an BlobDatum */ + public static final long BLOB_DATUM; + + /** Overhead for an DateDatum */ + public static final long DATE_DATUM; + + /** Overhead for an TimeDatum */ + public static final long TIME_DATUM; + + /** Overhead for an TimestampDatum */ + public static final long TIMESTAMP_DATUM; + + static { + NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false); + + CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false); + + BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false); + + BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false); + + INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false); + + INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false); + + INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false); + + FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false); + + FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false); + + TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false); + + BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false); + + DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false); + + TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false); + + TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false); + } + + public static long calculateMemorySize(Tuple tuple) { + long total = ClassSize.OBJECT; + for (Datum datum : tuple.getValues()) { + switch (datum.type()) { + + case NULL_TYPE: + total += NULL_DATUM; + break; + + case BOOLEAN: + total += BOOL_DATUM; + break; + + case BIT: + total += BIT_DATUM; + break; + + case CHAR: + total += CHAR_DATUM + datum.size(); + break; + + case INT1: + case INT2: + total += INT2_DATUM; + break; + + case INT4: + total += INT4_DATUM; + break; + + case INT8: + total += INT8_DATUM; + break; + + case FLOAT4: + total += FLOAT4_DATUM; + break; + + case FLOAT8: + total += FLOAT4_DATUM; + break; + + case TEXT: + total += TEXT_DATUM + datum.size(); + break; + + case DATE: + total += DATE_DATUM; + break; + + case TIME: + total += TIME_DATUM; + break; + + case TIMESTAMP: + total += TIMESTAMP_DATUM; + break; + + default: + break; + } + } + + return total; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java new file mode 100644 index 0000000..66b3667 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class MergeScanner implements Scanner { + private Configuration conf; + private TableMeta meta; + private Schema schema; + private List fragments; + private Iterator iterator; + private Fragment currentFragment; + private Scanner currentScanner; + private Tuple tuple; + private boolean projectable = false; + private boolean selectable = false; + private Schema target; + private float progress; + protected TableStats tableStats; + + public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List rawFragmentList) + throws IOException { + this(conf, schema, meta, rawFragmentList, schema); + } + + public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List rawFragmentList, + Schema target) + throws IOException { + this.conf = conf; + this.schema = schema; + this.meta = meta; + this.target = target; + + this.fragments = new ArrayList(); + + long numBytes = 0; + for (Fragment eachFileFragment: rawFragmentList) { + long fragmentLength = StorageManager.getFragmentLength((TajoConf)conf, eachFileFragment); + if (fragmentLength > 0) { + numBytes += fragmentLength; + fragments.add(eachFileFragment); + } + } + + // it should keep the input order. Otherwise, it causes wrong result of sort queries. + this.reset(); + + if (currentScanner != null) { + this.projectable = currentScanner.isProjectable(); + this.selectable = currentScanner.isSelectable(); + } + + tableStats = new TableStats(); + + tableStats.setNumBytes(numBytes); + tableStats.setNumBlocks(fragments.size()); + + for(Column eachColumn: schema.getColumns()) { + ColumnStats columnStats = new ColumnStats(eachColumn); + tableStats.addColumnStat(columnStats); + } + } + + @Override + public void init() throws IOException { + progress = 0.0f; + } + + @Override + public Tuple next() throws IOException { + if (currentScanner != null) + tuple = currentScanner.next(); + + if (tuple != null) { + return tuple; + } else { + if (currentScanner != null) { + currentScanner.close(); + TableStats scannerTableStsts = currentScanner.getInputStats(); + if (scannerTableStsts != null) { + tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes()); + tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows()); + } + } + currentScanner = getNextScanner(); + if (currentScanner != null) { + tuple = currentScanner.next(); + } + } + return tuple; + } + + @Override + public void reset() throws IOException { + this.iterator = fragments.iterator(); + if (currentScanner != null) { + currentScanner.close(); + } + this.currentScanner = getNextScanner(); + } + + private Scanner getNextScanner() throws IOException { + if (iterator.hasNext()) { + currentFragment = iterator.next(); + currentScanner = StorageManager.getStorageManager((TajoConf)conf, meta.getStoreType()).getScanner(meta, schema, + currentFragment, target); + currentScanner.init(); + return currentScanner; + } else { + return null; + } + } + + @Override + public void close() throws IOException { + if(currentScanner != null) { + currentScanner.close(); + currentScanner = null; + } + iterator = null; + progress = 1.0f; + } + + @Override + public boolean isProjectable() { + return projectable; + } + + @Override + public void setTarget(Column[] targets) { + this.target = new Schema(targets); + } + + @Override + public boolean isSelectable() { + return selectable; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public Schema getSchema() { + return schema; + } + + @Override + public boolean isSplittable(){ + return false; + } + + @Override + public float getProgress() { + if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) { + TableStats scannerTableStsts = currentScanner.getInputStats(); + long currentScannerReadBytes = 0; + if (scannerTableStsts != null) { + currentScannerReadBytes = scannerTableStsts.getReadBytes(); + } + + return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes(); + } else { + return progress; + } + } + + @Override + public TableStats getInputStats() { + return tableStats; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java new file mode 100644 index 0000000..4272228 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java @@ -0,0 +1,109 @@ +package org.apache.tajo.storage; /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; + +public class NullScanner implements Scanner { + protected final Configuration conf; + protected final TableMeta meta; + protected final Schema schema; + protected final Fragment fragment; + protected final int columnNum; + protected Column [] targets; + protected float progress; + protected TableStats tableStats; + + public NullScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) { + this.conf = conf; + this.meta = meta; + this.schema = schema; + this.fragment = fragment; + this.tableStats = new TableStats(); + this.columnNum = this.schema.size(); + } + + @Override + public void init() throws IOException { + progress = 0.0f; + tableStats.setNumBytes(0); + tableStats.setNumBlocks(0); + } + + @Override + public Tuple next() throws IOException { + progress = 1.0f; + return null; + } + + @Override + public void reset() throws IOException { + progress = 0.0f; + } + + @Override + public void close() throws IOException { + progress = 1.0f; + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public void setTarget(Column[] targets) { + this.targets = targets; + } + + @Override + public boolean isSelectable() { + return true; + } + + @Override + public void setSearchCondition(Object expr) { + + } + + @Override + public boolean isSplittable() { + return true; + } + + @Override + public float getProgress() { + return progress; + } + + @Override + public TableStats getInputStats() { + return tableStats; + } + + @Override + public Schema getSchema() { + return schema; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java new file mode 100644 index 0000000..94d13ee --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.Path; + +import java.util.Comparator; + +public class NumericPathComparator implements Comparator { + + @Override + public int compare(Path p1, Path p2) { + int num1 = Integer.parseInt(p1.getName()); + int num2 = Integer.parseInt(p2.getName()); + + return num1 - num2; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java new file mode 100644 index 0000000..24b6280 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.exception.UnknownDataTypeException; +import org.apache.tajo.tuple.offheap.RowWriter; +import org.apache.tajo.util.BitArray; + +import java.nio.ByteBuffer; + +public class RowStoreUtil { + public static int[] getTargetIds(Schema inSchema, Schema outSchema) { + int[] targetIds = new int[outSchema.size()]; + int i = 0; + for (Column target : outSchema.getColumns()) { + targetIds[i] = inSchema.getColumnId(target.getQualifiedName()); + i++; + } + + return targetIds; + } + + public static Tuple project(Tuple in, Tuple out, int[] targetIds) { + out.clear(); + for (int idx = 0; idx < targetIds.length; idx++) { + out.put(idx, in.get(targetIds[idx])); + } + return out; + } + + public static RowStoreEncoder createEncoder(Schema schema) { + return new RowStoreEncoder(schema); + } + + public static RowStoreDecoder createDecoder(Schema schema) { + return new RowStoreDecoder(schema); + } + + public static class RowStoreDecoder { + + private Schema schema; + private BitArray nullFlags; + private int headerSize; + + private RowStoreDecoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + + + public Tuple toTuple(byte [] bytes) { + nullFlags.clear(); + ByteBuffer bb = ByteBuffer.wrap(bytes); + Tuple tuple = new VTuple(schema.size()); + Column col; + TajoDataTypes.DataType type; + + bb.limit(headerSize); + nullFlags.fromByteBuffer(bb); + bb.limit(bytes.length); + + for (int i =0; i < schema.size(); i++) { + if (nullFlags.get(i)) { + tuple.put(i, DatumFactory.createNullDatum()); + continue; + } + + col = schema.getColumn(i); + type = col.getDataType(); + switch (type.getType()) { + case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break; + case BIT: + byte b = bb.get(); + tuple.put(i, DatumFactory.createBit(b)); + break; + + case CHAR: + byte c = bb.get(); + tuple.put(i, DatumFactory.createChar(c)); + break; + + case INT2: + short s = bb.getShort(); + tuple.put(i, DatumFactory.createInt2(s)); + break; + + case INT4: + case DATE: + int i_ = bb.getInt(); + tuple.put(i, DatumFactory.createFromInt4(type, i_)); + break; + + case INT8: + case TIME: + case TIMESTAMP: + long l = bb.getLong(); + tuple.put(i, DatumFactory.createFromInt8(type, l)); + break; + + case INTERVAL: + int month = bb.getInt(); + long milliseconds = bb.getLong(); + tuple.put(i, new IntervalDatum(month, milliseconds)); + break; + + case FLOAT4: + float f = bb.getFloat(); + tuple.put(i, DatumFactory.createFloat4(f)); + break; + + case FLOAT8: + double d = bb.getDouble(); + tuple.put(i, DatumFactory.createFloat8(d)); + break; + + case TEXT: + byte [] _string = new byte[bb.getInt()]; + bb.get(_string); + tuple.put(i, DatumFactory.createText(_string)); + break; + + case BLOB: + byte [] _bytes = new byte[bb.getInt()]; + bb.get(_bytes); + tuple.put(i, DatumFactory.createBlob(_bytes)); + break; + + case INET4: + byte [] _ipv4 = new byte[4]; + bb.get(_ipv4); + tuple.put(i, DatumFactory.createInet4(_ipv4)); + break; + case INET6: + // TODO - to be implemented + throw new UnsupportedException(type.getType().name()); + default: + throw new RuntimeException(new UnknownDataTypeException(type.getType().name())); + } + } + return tuple; + } + + public Schema getSchema() { + return schema; + } + } + + public static class RowStoreEncoder { + private Schema schema; + private BitArray nullFlags; + private int headerSize; + + private RowStoreEncoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + + public byte[] toBytes(Tuple tuple) { + nullFlags.clear(); + int size = estimateTupleDataSize(tuple); + ByteBuffer bb = ByteBuffer.allocate(size + headerSize); + bb.position(headerSize); + Column col; + for (int i = 0; i < schema.size(); i++) { + if (tuple.isNull(i)) { + nullFlags.set(i); + continue; + } + + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case NULL_TYPE: + nullFlags.set(i); + break; + case BOOLEAN: + bb.put(tuple.get(i).asByte()); + break; + case BIT: + bb.put(tuple.get(i).asByte()); + break; + case CHAR: + bb.put(tuple.get(i).asByte()); + break; + case INT2: + bb.putShort(tuple.get(i).asInt2()); + break; + case INT4: + bb.putInt(tuple.get(i).asInt4()); + break; + case INT8: + bb.putLong(tuple.get(i).asInt8()); + break; + case FLOAT4: + bb.putFloat(tuple.get(i).asFloat4()); + break; + case FLOAT8: + bb.putDouble(tuple.get(i).asFloat8()); + break; + case TEXT: + byte[] _string = tuple.get(i).asByteArray(); + bb.putInt(_string.length); + bb.put(_string); + break; + case DATE: + bb.putInt(tuple.get(i).asInt4()); + break; + case TIME: + case TIMESTAMP: + bb.putLong(tuple.get(i).asInt8()); + break; + case INTERVAL: + IntervalDatum interval = (IntervalDatum) tuple.get(i); + bb.putInt(interval.getMonths()); + bb.putLong(interval.getMilliSeconds()); + break; + case BLOB: + byte[] bytes = tuple.get(i).asByteArray(); + bb.putInt(bytes.length); + bb.put(bytes); + break; + case INET4: + byte[] ipBytes = tuple.get(i).asByteArray(); + bb.put(ipBytes); + break; + case INET6: + bb.put(tuple.get(i).asByteArray()); + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + } + } + + byte[] flags = nullFlags.toArray(); + int finalPosition = bb.position(); + bb.position(0); + bb.put(flags); + + bb.position(finalPosition); + bb.flip(); + byte[] buf = new byte[bb.limit()]; + bb.get(buf); + return buf; + } + + // Note that, NULL values are treated separately + private int estimateTupleDataSize(Tuple tuple) { + int size = 0; + Column col; + + for (int i = 0; i < schema.size(); i++) { + if (tuple.isNull(i)) { + continue; + } + + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case BOOLEAN: + case BIT: + case CHAR: + size += 1; + break; + case INT2: + size += 2; + break; + case DATE: + case INT4: + case FLOAT4: + size += 4; + break; + case TIME: + case TIMESTAMP: + case INT8: + case FLOAT8: + size += 8; + break; + case INTERVAL: + size += 12; + break; + case TEXT: + case BLOB: + size += (4 + tuple.get(i).asByteArray().length); + break; + case INET4: + case INET6: + size += tuple.get(i).asByteArray().length; + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + } + } + + size += 100; // optimistic reservation + + return size; + } + + public Schema getSchema() { + return schema; + } + } + + public static void convert(Tuple tuple, RowWriter writer) { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + if (tuple.isNull(i)) { + writer.skipField(); + continue; + } + switch (writer.dataTypes()[i].getType()) { + case BOOLEAN: + writer.putBool(tuple.getBool(i)); + break; + case INT1: + case INT2: + writer.putInt2(tuple.getInt2(i)); + break; + case INT4: + case DATE: + case INET4: + writer.putInt4(tuple.getInt4(i)); + break; + case INT8: + case TIMESTAMP: + case TIME: + writer.putInt8(tuple.getInt8(i)); + break; + case FLOAT4: + writer.putFloat4(tuple.getFloat4(i)); + break; + case FLOAT8: + writer.putFloat8(tuple.getFloat8(i)); + break; + case TEXT: + writer.putText(tuple.getBytes(i)); + break; + case INTERVAL: + writer.putInterval((IntervalDatum) tuple.getInterval(i)); + break; + case PROTOBUF: + writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); + break; + case NULL_TYPE: + writer.skipField(); + break; + default: + throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]); + } + } + writer.endRow(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java new file mode 100644 index 0000000..0356b19 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SchemaObject; +import org.apache.tajo.catalog.statistics.TableStats; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Scanner Interface + */ +public interface Scanner extends SchemaObject, Closeable { + + void init() throws IOException; + + /** + * It returns one tuple at each call. + * + * @return retrieve null if the scanner has no more tuples. + * Otherwise it returns one tuple. + * + * @throws java.io.IOException if internal I/O error occurs during next method + */ + Tuple next() throws IOException; + + /** + * Reset the cursor. After executed, the scanner + * will retrieve the first tuple. + * + * @throws java.io.IOException if internal I/O error occurs during reset method + */ + void reset() throws IOException; + + /** + * Close scanner + * + * @throws java.io.IOException if internal I/O error occurs during close method + */ + void close() throws IOException; + + + /** + * It returns if the projection is executed in the underlying scanner layer. + * + * @return true if this scanner can project the given columns. + */ + boolean isProjectable(); + + /** + * Set target columns + * @param targets columns to be projected + */ + void setTarget(Column[] targets); + + /** + * It returns if the selection is executed in the underlying scanner layer. + * + * @return true if this scanner can filter tuples against a given condition. + */ + boolean isSelectable(); + + /** + * Set a search condition + * @param expr to be searched + * + * TODO - to be changed Object type + */ + void setSearchCondition(Object expr); + + /** + * It returns if the file is splittable. + * + * @return true if this scanner can split the a file. + */ + boolean isSplittable(); + + /** + * How much of the input has the Scanner consumed + * @return progress from 0.0 to 1.0. + */ + float getProgress(); + + TableStats getInputStats(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java new file mode 100644 index 0000000..894e7ee --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.io.IOException; + +public interface SeekableScanner extends Scanner { + + public abstract long getNextOffset() throws IOException; + + public abstract void seek(long offset) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java new file mode 100644 index 0000000..564a9f5 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; + +import java.io.IOException; +import java.io.OutputStream; + +@Deprecated +public interface SerializerDeserializer { + + public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException; + + public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException; + +}