tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [01/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.
Date Wed, 03 Dec 2014 05:30:16 GMT
Repository: tajo
Updated Branches:
  refs/heads/hbase_storage 87c957e43 -> dfd7f996d


http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
new file mode 100644
index 0000000..517e00e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.junit.Test;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TajoSchemaConverter}.
+ */
+public class TestSchemaConverter {
+  private static final String ALL_PARQUET_SCHEMA =
+      "message table_schema {\n" +
+      "  optional boolean myboolean;\n" +
+      "  optional int32 myint;\n" +
+      "  optional int64 mylong;\n" +
+      "  optional float myfloat;\n" +
+      "  optional double mydouble;\n" +
+      "  optional binary mybytes;\n" +
+      "  optional binary mystring (UTF8);\n" +
+      "  optional fixed_len_byte_array(1) myfixed;\n" +
+      "}\n";
+
+  private static final String CONVERTED_ALL_PARQUET_SCHEMA =
+      "message table_schema {\n" +
+      "  optional boolean myboolean;\n" +
+      "  optional int32 mybit;\n" +
+      "  optional binary mychar (UTF8);\n" +
+      "  optional int32 myint2;\n" +
+      "  optional int32 myint4;\n" +
+      "  optional int64 myint8;\n" +
+      "  optional float myfloat4;\n" +
+      "  optional double myfloat8;\n" +
+      "  optional binary mytext (UTF8);\n" +
+      "  optional binary myblob;\n" +
+      // NULL_TYPE fields are not encoded.
+      "  optional binary myinet4;\n" +
+      "  optional binary myprotobuf;\n" +
+      "}\n";
+
+  private Schema createAllTypesSchema() {
+    List<Column> columns = new ArrayList<Column>();
+    columns.add(new Column("myboolean", Type.BOOLEAN));
+    columns.add(new Column("mybit", Type.BIT));
+    columns.add(new Column("mychar", Type.CHAR));
+    columns.add(new Column("myint2", Type.INT2));
+    columns.add(new Column("myint4", Type.INT4));
+    columns.add(new Column("myint8", Type.INT8));
+    columns.add(new Column("myfloat4", Type.FLOAT4));
+    columns.add(new Column("myfloat8", Type.FLOAT8));
+    columns.add(new Column("mytext", Type.TEXT));
+    columns.add(new Column("myblob", Type.BLOB));
+    columns.add(new Column("mynull", Type.NULL_TYPE));
+    columns.add(new Column("myinet4", Type.INET4));
+    columns.add(new Column("myprotobuf", Type.PROTOBUF));
+    Column[] columnsArray = new Column[columns.size()];
+    columnsArray = columns.toArray(columnsArray);
+    return new Schema(columnsArray);
+  }
+
+  private Schema createAllTypesConvertedSchema() {
+    List<Column> columns = new ArrayList<Column>();
+    columns.add(new Column("myboolean", Type.BOOLEAN));
+    columns.add(new Column("myint", Type.INT4));
+    columns.add(new Column("mylong", Type.INT8));
+    columns.add(new Column("myfloat", Type.FLOAT4));
+    columns.add(new Column("mydouble", Type.FLOAT8));
+    columns.add(new Column("mybytes", Type.BLOB));
+    columns.add(new Column("mystring", Type.TEXT));
+    columns.add(new Column("myfixed", Type.BLOB));
+    Column[] columnsArray = new Column[columns.size()];
+    columnsArray = columns.toArray(columnsArray);
+    return new Schema(columnsArray);
+  }
+
+  private void testTajoToParquetConversion(
+      Schema tajoSchema, String schemaString) throws Exception {
+    TajoSchemaConverter converter = new TajoSchemaConverter();
+    MessageType schema = converter.convert(tajoSchema);
+    MessageType expected = MessageTypeParser.parseMessageType(schemaString);
+    assertEquals("converting " + schema + " to " + schemaString,
+                 expected.toString(), schema.toString());
+  }
+
+  private void testParquetToTajoConversion(
+      Schema tajoSchema, String schemaString) throws Exception {
+    TajoSchemaConverter converter = new TajoSchemaConverter();
+    Schema schema = converter.convert(
+        MessageTypeParser.parseMessageType(schemaString));
+    assertEquals("converting " + schemaString + " to " + tajoSchema,
+                 tajoSchema.toString(), schema.toString());
+  }
+
+  @Test
+  public void testAllTypesToParquet() throws Exception {
+    Schema schema = createAllTypesSchema();
+    testTajoToParquetConversion(schema, CONVERTED_ALL_PARQUET_SCHEMA);
+  }
+
+  @Test
+  public void testAllTypesToTajo() throws Exception {
+    Schema schema = createAllTypesConvertedSchema();
+    testParquetToTajoConversion(schema, ALL_PARQUET_SCHEMA);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java
new file mode 100644
index 0000000..7b09937
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.*;
+
+/**
+ * Holds file metadata including type (regular file, or directory),
+ * and the list of blocks that are pointers to the data.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class INode {
+
+  enum FileType {
+    DIRECTORY, FILE
+  }
+
+  public static final FileType[] FILE_TYPES = {
+      FileType.DIRECTORY,
+      FileType.FILE
+  };
+
+  public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
+
+  private FileType fileType;
+  private Block[] blocks;
+
+  public INode(FileType fileType, Block[] blocks) {
+    this.fileType = fileType;
+    if (isDirectory() && blocks != null) {
+      throw new IllegalArgumentException("A directory cannot contain blocks.");
+    }
+    this.blocks = blocks;
+  }
+
+  public Block[] getBlocks() {
+    return blocks;
+  }
+
+  public FileType getFileType() {
+    return fileType;
+  }
+
+  public boolean isDirectory() {
+    return fileType == FileType.DIRECTORY;
+  }
+
+  public boolean isFile() {
+    return fileType == FileType.FILE;
+  }
+
+  public long getSerializedLength() {
+    return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
+  }
+
+
+  public InputStream serialize() throws IOException {
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bytes);
+    try {
+      out.writeByte(fileType.ordinal());
+      if (isFile()) {
+        out.writeInt(blocks.length);
+        for (int i = 0; i < blocks.length; i++) {
+          out.writeLong(blocks[i].getId());
+          out.writeLong(blocks[i].getLength());
+        }
+      }
+      out.close();
+      out = null;
+    } finally {
+      IOUtils.closeStream(out);
+    }
+    return new ByteArrayInputStream(bytes.toByteArray());
+  }
+
+  public static INode deserialize(InputStream in) throws IOException {
+    if (in == null) {
+      return null;
+    }
+    DataInputStream dataIn = new DataInputStream(in);
+    FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
+    switch (fileType) {
+      case DIRECTORY:
+        in.close();
+        return INode.DIRECTORY_INODE;
+      case FILE:
+        int numBlocks = dataIn.readInt();
+        Block[] blocks = new Block[numBlocks];
+        for (int i = 0; i < numBlocks; i++) {
+          long id = dataIn.readLong();
+          long length = dataIn.readLong();
+          blocks[i] = new Block(id, length);
+        }
+        in.close();
+        return new INode(fileType, blocks);
+      default:
+        throw new IllegalArgumentException("Cannot deserialize inode.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
new file mode 100644
index 0000000..f3ac7b6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.fs.s3.INode;
+import org.apache.hadoop.fs.s3.FileSystemStore;
+import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.tajo.common.exception.NotImplementedException;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+/**
+ * A stub implementation of {@link FileSystemStore} for testing
+ * {@link S3FileSystem} without actually connecting to S3.
+ */
+public class InMemoryFileSystemStore implements FileSystemStore {
+
+  private Configuration conf;
+  private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
+  private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
+
+  @Override
+  public void initialize(URI uri, Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String getVersion() throws IOException {
+    return "0";
+  }
+
+  @Override
+  public void deleteINode(Path path) throws IOException {
+    inodes.remove(normalize(path));
+  }
+
+  @Override
+  public void deleteBlock(Block block) throws IOException {
+    blocks.remove(block.getId());
+  }
+
+  @Override
+  public boolean inodeExists(Path path) throws IOException {
+    return inodes.containsKey(normalize(path));
+  }
+
+  @Override
+  public boolean blockExists(long blockId) throws IOException {
+    return blocks.containsKey(blockId);
+  }
+
+  @Override
+  public INode retrieveINode(Path path) throws IOException {
+    return inodes.get(normalize(path));
+  }
+
+  @Override
+  public File retrieveBlock(Block block, long byteRangeStart) throws IOException {
+    byte[] data = blocks.get(block.getId());
+    File file = createTempFile();
+    BufferedOutputStream out = null;
+    try {
+      out = new BufferedOutputStream(new FileOutputStream(file));
+      out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+    return file;
+  }
+
+  private File createTempFile() throws IOException {
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("test-", ".tmp", dir);
+    result.deleteOnExit();
+    return result;
+  }
+
+  @Override
+  public Set<Path> listSubPaths(Path path) throws IOException {
+    Path normalizedPath = normalize(path);
+    // This is inefficient but more than adequate for testing purposes.
+    Set<Path> subPaths = new LinkedHashSet<Path>();
+    for (Path p : inodes.tailMap(normalizedPath).keySet()) {
+      if (normalizedPath.equals(p.getParent())) {
+        subPaths.add(p);
+      }
+    }
+    return subPaths;
+  }
+
+  @Override
+  public Set<Path> listDeepSubPaths(Path path) throws IOException {
+    Path normalizedPath = normalize(path);
+    String pathString = normalizedPath.toUri().getPath();
+    if (!pathString.endsWith("/")) {
+      pathString += "/";
+    }
+    // This is inefficient but more than adequate for testing purposes.
+    Set<Path> subPaths = new LinkedHashSet<Path>();
+    for (Path p : inodes.tailMap(normalizedPath).keySet()) {
+      if (p.toUri().getPath().startsWith(pathString)) {
+        subPaths.add(p);
+      }
+    }
+    return subPaths;
+  }
+
+  @Override
+  public void storeINode(Path path, INode inode) throws IOException {
+    inodes.put(normalize(path), inode);
+  }
+
+  @Override
+  public void storeBlock(Block block, File file) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    byte[] buf = new byte[8192];
+    int numRead;
+    BufferedInputStream in = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(file));
+      while ((numRead = in.read(buf)) >= 0) {
+        out.write(buf, 0, numRead);
+      }
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+    blocks.put(block.getId(), out.toByteArray());
+  }
+
+  private Path normalize(Path path) {
+    if (!path.isAbsolute()) {
+      throw new IllegalArgumentException("Path must be absolute: " + path);
+    }
+    return new Path(path.toUri().getPath());
+  }
+
+  @Override
+  public void purge() throws IOException {
+    inodes.clear();
+    blocks.clear();
+  }
+
+  @Override
+  public void dump() throws IOException {
+    throw new NotImplementedException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
new file mode 100644
index 0000000..d4034b9
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.fs.s3.FileSystemStore;
+import org.apache.hadoop.fs.s3.INode;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3OutputStream extends OutputStream {
+
+  private Configuration conf;
+
+  private int bufferSize;
+
+  private FileSystemStore store;
+
+  private Path path;
+
+  private long blockSize;
+
+  private File backupFile;
+
+  private OutputStream backupStream;
+
+  private Random r = new Random();
+
+  private boolean closed;
+
+  private int pos = 0;
+
+  private long filePos = 0;
+
+  private int bytesWrittenToBlock = 0;
+
+  private byte[] outBuf;
+
+  private List<Block> blocks = new ArrayList<Block>();
+
+  private Block nextBlock;
+
+  private static final Log LOG =
+      LogFactory.getLog(S3OutputStream.class.getName());
+
+
+  public S3OutputStream(Configuration conf, FileSystemStore store,
+                        Path path, long blockSize, Progressable progress,
+                        int buffersize) throws IOException {
+
+    this.conf = conf;
+    this.store = store;
+    this.path = path;
+    this.blockSize = blockSize;
+    this.backupFile = newBackupFile();
+    this.backupStream = new FileOutputStream(backupFile);
+    this.bufferSize = buffersize;
+    this.outBuf = new byte[bufferSize];
+
+  }
+
+  private File newBackupFile() throws IOException {
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("output-", ".tmp", dir);
+    result.deleteOnExit();
+    return result;
+  }
+
+  public long getPos() throws IOException {
+    return filePos;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
+      flush();
+    }
+    outBuf[pos++] = (byte) b;
+    filePos++;
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    while (len > 0) {
+      int remaining = bufferSize - pos;
+      int toWrite = Math.min(remaining, len);
+      System.arraycopy(b, off, outBuf, pos, toWrite);
+      pos += toWrite;
+      off += toWrite;
+      len -= toWrite;
+      filePos += toWrite;
+
+      if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if (bytesWrittenToBlock + pos >= blockSize) {
+      flushData((int) blockSize - bytesWrittenToBlock);
+    }
+    if (bytesWrittenToBlock == blockSize) {
+      endBlock();
+    }
+    flushData(pos);
+  }
+
+  private synchronized void flushData(int maxPos) throws IOException {
+    int workingPos = Math.min(pos, maxPos);
+
+    if (workingPos > 0) {
+      //
+      // To the local block backup, write just the bytes
+      //
+      backupStream.write(outBuf, 0, workingPos);
+
+      //
+      // Track position
+      //
+      bytesWrittenToBlock += workingPos;
+      System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
+      pos -= workingPos;
+    }
+  }
+
+  private synchronized void endBlock() throws IOException {
+    //
+    // Done with local copy
+    //
+    backupStream.close();
+
+    //
+    // Send it to S3
+    //
+    // TODO: Use passed in Progressable to report progress.
+    nextBlockOutputStream();
+    store.storeBlock(nextBlock, backupFile);
+    Block[] arr = new Block[blocks.size()];
+    arr = blocks.toArray(arr);
+    store.storeINode(path, new INode(INode.FILE_TYPES[1], arr));
+
+    //
+    // Delete local backup, start new one
+    //
+    boolean b = backupFile.delete();
+    if (!b) {
+      LOG.warn("Ignoring failed delete");
+    }
+    backupFile = newBackupFile();
+    backupStream = new FileOutputStream(backupFile);
+    bytesWrittenToBlock = 0;
+  }
+
+  private synchronized void nextBlockOutputStream() throws IOException {
+    long blockId = r.nextLong();
+    while (store.blockExists(blockId)) {
+      blockId = r.nextLong();
+    }
+    nextBlock = new Block(blockId, bytesWrittenToBlock);
+    blocks.add(nextBlock);
+    bytesWrittenToBlock = 0;
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    flush();
+    if (filePos == 0 || bytesWrittenToBlock != 0) {
+      endBlock();
+    }
+
+    backupStream.close();
+    boolean b = backupFile.delete();
+    if (!b) {
+      LOG.warn("Ignoring failed delete");
+    }
+
+    super.close();
+
+    closed = true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
new file mode 100644
index 0000000..cb97a74
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.fs.s3.INode;
+import org.apache.hadoop.fs.s3.FileSystemStore;
+import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SmallBlockS3FileSystem extends S3FileSystem {
+
+  private URI uri;
+
+  private FileSystemStore store;
+
+  private Path workingDir;
+
+  static class Holder {
+    private static InMemoryFileSystemStore s;
+
+    public synchronized static FileSystemStore get() {
+      if(s != null) {
+        return s;
+      }
+      s = new InMemoryFileSystemStore();
+      return s;
+    }
+
+    public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore)
{
+      s = inMemoryFileSystemStore;
+    }
+  }
+
+  public SmallBlockS3FileSystem() {
+  }
+
+
+  public SmallBlockS3FileSystem(
+      InMemoryFileSystemStore inMemoryFileSystemStore) {
+    Holder.set(inMemoryFileSystemStore);
+    this.store = inMemoryFileSystemStore;
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+  @Override
+  public long getDefaultBlockSize() {
+    return 10;
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    if (store == null) {
+      store = Holder.get();
+    }
+    store.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+    this.workingDir =
+        new Path("/user", System.getProperty("user.name")).makeQualified(this);
+  }
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(path));
+    if (inode == null) {
+      return false;
+    }
+    return inode.isFile();
+  }
+
+  private INode checkFile(Path path) throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(path));
+    if (inode == null) {
+      throw new IOException("No such file.");
+    }
+    if (inode.isDirectory()) {
+      throw new IOException("Path " + path + " is a directory.");
+    }
+    return inode;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    Path absolutePath = makeAbsolute(f);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      throw new FileNotFoundException("File " + f + " does not exist.");
+    }
+    if (inode.isFile()) {
+      return new FileStatus[] {
+          new S3FileStatus(f.makeQualified(this), inode)
+      };
+    }
+    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
+    for (Path p : store.listSubPaths(absolutePath)) {
+      ret.add(getFileStatus(p.makeQualified(this)));
+    }
+    return ret.toArray(new FileStatus[0]);
+  }
+  @Override
+  public FSDataOutputStream create(Path file, FsPermission permission,
+                                   boolean overwrite, int bufferSize,
+                                   short replication, long blockSize, Progressable progress)
+      throws IOException {
+
+    INode inode = store.retrieveINode(makeAbsolute(file));
+    if (inode != null) {
+      if (overwrite) {
+        delete(file, true);
+      } else {
+        throw new IOException("File already exists: " + file);
+      }
+    } else {
+      Path parent = file.getParent();
+      if (parent != null) {
+        if (!mkdirs(parent)) {
+          throw new IOException("Mkdirs failed to create " + parent.toString());
+        }
+      }
+    }
+    return new FSDataOutputStream
+        (new S3OutputStream(getConf(), store, makeAbsolute(file),
+            blockSize, progress, bufferSize),
+            statistics);
+  }
+  @Override
+  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    List<Path> paths = new ArrayList<Path>();
+    do {
+      paths.add(0, absolutePath);
+      absolutePath = absolutePath.getParent();
+    } while (absolutePath != null);
+
+    boolean result = true;
+    for (Path p : paths) {
+      result &= mkdir(p);
+    }
+    return result;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    Path absoluteSrc = makeAbsolute(src);
+    INode srcINode = store.retrieveINode(absoluteSrc);
+    if (srcINode == null) {
+      // src path doesn't exist
+      return false;
+    }
+    Path absoluteDst = makeAbsolute(dst);
+    INode dstINode = store.retrieveINode(absoluteDst);
+    if (dstINode != null && dstINode.isDirectory()) {
+      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
+      dstINode = store.retrieveINode(absoluteDst);
+    }
+    if (dstINode != null) {
+      // dst path already exists - can't overwrite
+      return false;
+    }
+    Path dstParent = absoluteDst.getParent();
+    if (dstParent != null) {
+      INode dstParentINode = store.retrieveINode(dstParent);
+      if (dstParentINode == null || dstParentINode.isFile()) {
+        // dst parent doesn't exist or is a file
+        return false;
+      }
+    }
+    return renameRecursive(absoluteSrc, absoluteDst);
+  }
+
+  private boolean renameRecursive(Path src, Path dst) throws IOException {
+    INode srcINode = store.retrieveINode(src);
+    store.storeINode(dst, srcINode);
+    store.deleteINode(src);
+    if (srcINode.isDirectory()) {
+      for (Path oldSrc : store.listDeepSubPaths(src)) {
+        INode inode = store.retrieveINode(oldSrc);
+        if (inode == null) {
+          return false;
+        }
+        String oldSrcPath = oldSrc.toUri().getPath();
+        String srcPath = src.toUri().getPath();
+        String dstPath = dst.toUri().getPath();
+        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
+        store.storeINode(newDst, inode);
+        store.deleteINode(oldSrc);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean delete(Path path, boolean recursive) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      return false;
+    }
+    if (inode.isFile()) {
+      store.deleteINode(absolutePath);
+      for (Block block: inode.getBlocks()) {
+        store.deleteBlock(block);
+      }
+    } else {
+      FileStatus[] contents = null;
+      try {
+        contents = listStatus(absolutePath);
+      } catch(FileNotFoundException fnfe) {
+        return false;
+      }
+
+      if ((contents.length !=0) && (!recursive)) {
+        throw new IOException("Directory " + path.toString()
+            + " is not empty.");
+      }
+      for (FileStatus p:contents) {
+        if (!delete(p.getPath(), recursive)) {
+          return false;
+        }
+      }
+      store.deleteINode(absolutePath);
+    }
+    return true;
+  }
+
+  /**
+   * FileStatus for S3 file systems.
+   */
+  @Override
+  public FileStatus getFileStatus(Path f)  throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(f));
+    if (inode == null) {
+      throw new FileNotFoundException(f + ": No such file or directory.");
+    }
+    return new S3FileStatus(f.makeQualified(this), inode);
+  }
+  private boolean mkdir(Path path) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
+    } else if (inode.isFile()) {
+      throw new IOException(String.format(
+          "Can't make directory for path %s since it is a file.",
+          absolutePath));
+    }
+    return true;
+  }
+  private Path makeAbsolute(Path path) {
+    if (path.isAbsolute()) {
+      return path;
+    }
+    return new Path(workingDir, path);
+  }
+
+  private static class S3FileStatus extends FileStatus {
+
+    S3FileStatus(Path f, INode inode) throws IOException {
+      super(findLength(inode), inode.isDirectory(), 1,
+          findBlocksize(inode), 0, f);
+    }
+
+    private static long findLength(INode inode) {
+      if (!inode.isDirectory()) {
+        long length = 0L;
+        for (Block block : inode.getBlocks()) {
+          length += block.getLength();
+        }
+        return length;
+      }
+      return 0;
+    }
+
+    private static long findBlocksize(INode inode) {
+      final Block[] ret = inode.getBlocks();
+      return ret == null ? 0L : ret[0].getLength();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
new file mode 100644
index 0000000..6190d1a
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+  <property>
+    <name>fs.s3.impl</name>
+    <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+  </property>
+
+  <!-- Storage Manager Configuration -->
+  <property>
+    <name>tajo.storage.manager.hdfs.class</name>
+    <value>org.apache.tajo.storage.FileStorageManager</value>
+  </property>
+  <property>
+    <name>tajo.storage.manager.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+  </property>
+
+  <!--- Registered Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler</name>
+    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+  </property>
+
+  <!--- Fragment Class Configurations -->
+  <property>
+    <name>tajo.storage.fragment.textfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.csv.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.raw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.rcfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.row.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.parquet.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.sequencefile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+
+  <!--- Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <!--- Appender Handler -->
+  <property>
+    <name>tajo.storage.appender-handler</name>
+    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc b/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc
new file mode 100644
index 0000000..611b97f
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc
@@ -0,0 +1,21 @@
+{
+  "type": "record",
+  "namespace": "org.apache.tajo",
+  "name": "testVariousTypes",
+  "fields": [
+    { "name": "col1", "type": "boolean" },
+    { "name": "col2", "type": "int" },
+    { "name": "col3", "type": "string" },
+    { "name": "col4", "type": "int" },
+    { "name": "col5", "type": "int" },
+    { "name": "col6", "type": "long" },
+    { "name": "col7", "type": "float" },
+    { "name": "col8", "type": "double" },
+    { "name": "col9", "type": "string" },
+    { "name": "col10", "type": "bytes" },
+    { "name": "col11", "type": "bytes" },
+    { "name": "col12", "type": "null" },
+    { "name": "col13", "type": "bytes" }
+  ]
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-yarn-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/pom.xml b/tajo-yarn-pullserver/pom.xml
index a7644a1..3daec5c 100644
--- a/tajo-yarn-pullserver/pom.xml
+++ b/tajo-yarn-pullserver/pom.xml
@@ -57,7 +57,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-storage</artifactId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
       <scope>provided</scope>
     </dependency>
     <dependency>


Mime
View raw message