tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject git commit: TAJO-577: Support S3FileSystem split. (Yongjun Park via jihoon)
Date Mon, 03 Feb 2014 16:59:55 GMT
Updated Branches:
  refs/heads/master db2f733a6 -> f01034863


TAJO-577: Support S3FileSystem split. (Yongjun Park via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/f0103486
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/f0103486
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/f0103486

Branch: refs/heads/master
Commit: f01034863908f87718a21e9491bd199d002c7080
Parents: db2f733
Author: Jihoon Son <jihoonson@apache.org>
Authored: Tue Feb 4 01:59:40 2014 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Tue Feb 4 01:59:40 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +-
 .../master/querymaster/QueryMasterTask.java     |   2 +-
 .../tajo/storage/AbstractStorageManager.java    |  49 +--
 .../apache/tajo/storage/TestFileSystems.java    | 109 +++++++
 .../java/org/apache/tajo/storage/s3/INode.java  | 124 ++++++++
 .../storage/s3/InMemoryFileSystemStore.java     | 175 +++++++++++
 .../apache/tajo/storage/s3/S3OutputStream.java  | 234 +++++++++++++++
 .../tajo/storage/s3/SmallBlockS3FileSystem.java | 296 +++++++++++++++++++
 .../src/test/resources/storage-default.xml      |  20 ++
 10 files changed, 996 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e6f34a0..e3c433f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -243,6 +243,8 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-577: Support S3FileSystem split. (Yongjun Park via jihoon)
+
     TAJO-568: Union query with the same alias names cause NPE. (hyunsik)
 
     TAJO-570: InvalidOperationException in outer join with constant values.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index d18bd20..38aa191 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -141,6 +141,8 @@ public class TajoConf extends Configuration {
     // Storage Configuration
     //////////////////////////////////
     RAWFILE_SYNC_INTERVAL("rawfile.sync.interval", null),
+    MINIMUM_SPLIT_SIZE("tajo.min.split.size", (long) 536870912),
+    MAXIMUM_SPLIT_SIZE("tajo.max.split.size", (long) 67108864),
     // for RCFile
     HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true),
 
@@ -416,7 +418,7 @@ public class TajoConf extends Configuration {
   public static Path getTajoRootDir(TajoConf conf) {
     String rootPath = conf.getVar(ConfVars.ROOT_DIR);
     Preconditions.checkNotNull(rootPath,
-          ConfVars.ROOT_DIR.varname + " must be set before a Tajo Cluster starts up");
+        ConfVars.ROOT_DIR.varname + " must be set before a Tajo Cluster starts up");
     return new Path(rootPath);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index e57ea74..e193509 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -355,7 +355,7 @@ public class QueryMasterTask extends CompositeService {
       FileStatus fsStatus = defaultFS.getFileStatus(stagingDir);
       String owner = fsStatus.getOwner();
 
-      if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+      if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser)))
{
         throw new IOException("The ownership on the user's query " +
             "directory " + stagingDir + " is not as expected. " +
             "It is owned by " + owner + ". The directory must " +

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
index 91a535e..122e639 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -254,7 +254,7 @@ public abstract class AbstractStorageManager {
   }
 
   public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
-                                   Path tablePath, long size)
+                                       Path tablePath, long size)
       throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
 
@@ -409,16 +409,14 @@ public abstract class AbstractStorageManager {
     return scanner.isSplittable();
   }
 
-  @Deprecated
+
   protected long computeSplitSize(long blockSize, long minSize,
                                   long maxSize) {
     return Math.max(minSize, Math.min(maxSize, blockSize));
   }
 
-  @Deprecated
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
-  @Deprecated
   protected int getBlockIndex(BlockLocation[] blkLocations,
                               long offset) {
     for (int i = 0; i < blkLocations.length; i++) {
@@ -443,14 +441,19 @@ public abstract class AbstractStorageManager {
     return new FileFragment(fragmentId, file, start, length);
   }
 
+  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start,
long length,
+                                   String[] hosts) {
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
   protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation
blockLocation,
-                               int[] diskIds) throws IOException {
+                                   int[] diskIds) throws IOException {
     return new FileFragment(fragmentId, file, blockLocation, diskIds);
   }
 
   // for Non Splittable. eg, compressed gzip TextFile
   protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long
start, long length,
-                                  BlockLocation[] blkLocations) throws IOException {
+                                      BlockLocation[] blkLocations) throws IOException {
 
     Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
     for (BlockLocation blockLocation : blkLocations) {
@@ -486,10 +489,8 @@ public abstract class AbstractStorageManager {
    *
    * @return the maximum number of bytes a split can include
    */
-  @Deprecated
-  public static long getMaxSplitSize() {
-    // TODO - to be configurable
-    return 536870912L;
+  public long getMaxSplitSize() {
+    return conf.getLongVar(TajoConf.ConfVars.MAXIMUM_SPLIT_SIZE);
   }
 
   /**
@@ -497,10 +498,8 @@ public abstract class AbstractStorageManager {
    *
    * @return the minimum number of bytes that can be in a split
    */
-  @Deprecated
-  public static long getMinSplitSize() {
-    // TODO - to be configurable
-    return 67108864L;
+  public long getMinSplitSize() {
+    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
   }
 
   /**
@@ -557,6 +556,9 @@ public abstract class AbstractStorageManager {
   public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema,
Path inputPath) throws IOException {
     // generate splits'
 
+    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize());
+    long maxSize = getMaxSplitSize();
+
     List<FileFragment> splits = new ArrayList<FileFragment>();
     FileSystem fs = inputPath.getFileSystem(conf);
     List<FileStatus> files;
@@ -594,8 +596,21 @@ public abstract class AbstractStorageManager {
 
         } else {
           if (splittable) {
-            for (BlockLocation blockLocation : blkLocations) {
-              splits.add(makeSplit(tableName, meta, path, blockLocation, null));
+            // for s3
+            long blockSize = file.getBlockSize();
+            long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+            long bytesRemaining = length;
+            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+              splits.add(makeSplit(tableName, meta, path, length-bytesRemaining, splitSize,
+                  blkLocations[blkIndex].getHosts()));
+              bytesRemaining -= splitSize;
+            }
+            if (bytesRemaining != 0) {
+              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+              splits.add(makeSplit(tableName, meta, path, length-bytesRemaining, bytesRemaining,
+                  blkLocations[blkIndex].getHosts()));
             }
           } else { // Non splittable
             splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
@@ -619,7 +634,7 @@ public abstract class AbstractStorageManager {
 
     @Override
     public String getMessage(){
-       StringBuffer sb = new StringBuffer();
+      StringBuffer sb = new StringBuffer();
       int messageLimit = Math.min(errors.size(), 10);
       for (int i = 0; i < messageLimit ; i ++) {
         sb.append(errors.get(i).getMessage()).append("\n");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
new file mode 100644
index 0000000..67776eb
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -0,0 +1,109 @@
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
+import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestFileSystems {
+
+  protected byte[] data = null;
+
+  private static String TEST_PATH = "target/test-data/TestFileSystem";
+  private TajoConf conf = null;
+  private AbstractStorageManager sm = null;
+  private FileSystem fs = null;
+  Path testDir;
+
+  public TestFileSystems(FileSystem fs) throws IOException {
+    conf = new TajoConf();
+    sm = StorageManagerFactory.getStorageManager(conf);
+
+    this.fs = fs;
+    fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
+    testDir = getTestDir(this.fs, TEST_PATH);
+  }
+
+  public Path getTestDir(FileSystem fs, String dir) throws IOException {
+    Path path = new Path(dir);
+    if(fs.exists(path))
+      fs.delete(path, true);
+
+    fs.mkdirs(path);
+
+    return fs.makeQualified(path);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
+    });
+  }
+
+  @Test
+  public void testBlockSplit() throws IOException {
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+    Tuple[] tuples = new Tuple[4];
+    for (int i = 0; i < tuples.length; i++) {
+      tuples[i] = new VTuple(3);
+      tuples[i]
+          .put(new Datum[] { DatumFactory.createInt4(i),
+              DatumFactory.createInt4(i + 32),
+              DatumFactory.createText("name" + i) });
+    }
+
+    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
+        "table.csv");
+    fs.mkdirs(path.getParent());
+
+    Appender appender = sm.getAppender(meta, schema, path);
+    appender.init();
+    for (Tuple t : tuples) {
+      appender.addTuple(t);
+    }
+    appender.close();
+    FileStatus fileStatus = fs.getFileStatus(path);
+
+    List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
+    int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
+    assertEquals(splits.size(), splitSize);
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
new file mode 100644
index 0000000..7b09937
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.*;
+
+/**
+ * Holds file metadata including type (regular file, or directory),
+ * and the list of blocks that are pointers to the data.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class INode {
+
+  enum FileType {
+    DIRECTORY, FILE
+  }
+
+  public static final FileType[] FILE_TYPES = {
+      FileType.DIRECTORY,
+      FileType.FILE
+  };
+
+  public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
+
+  private FileType fileType;
+  private Block[] blocks;
+
+  public INode(FileType fileType, Block[] blocks) {
+    this.fileType = fileType;
+    if (isDirectory() && blocks != null) {
+      throw new IllegalArgumentException("A directory cannot contain blocks.");
+    }
+    this.blocks = blocks;
+  }
+
+  public Block[] getBlocks() {
+    return blocks;
+  }
+
+  public FileType getFileType() {
+    return fileType;
+  }
+
+  public boolean isDirectory() {
+    return fileType == FileType.DIRECTORY;
+  }
+
+  public boolean isFile() {
+    return fileType == FileType.FILE;
+  }
+
+  public long getSerializedLength() {
+    return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
+  }
+
+
+  public InputStream serialize() throws IOException {
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bytes);
+    try {
+      out.writeByte(fileType.ordinal());
+      if (isFile()) {
+        out.writeInt(blocks.length);
+        for (int i = 0; i < blocks.length; i++) {
+          out.writeLong(blocks[i].getId());
+          out.writeLong(blocks[i].getLength());
+        }
+      }
+      out.close();
+      out = null;
+    } finally {
+      IOUtils.closeStream(out);
+    }
+    return new ByteArrayInputStream(bytes.toByteArray());
+  }
+
+  public static INode deserialize(InputStream in) throws IOException {
+    if (in == null) {
+      return null;
+    }
+    DataInputStream dataIn = new DataInputStream(in);
+    FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
+    switch (fileType) {
+      case DIRECTORY:
+        in.close();
+        return INode.DIRECTORY_INODE;
+      case FILE:
+        int numBlocks = dataIn.readInt();
+        Block[] blocks = new Block[numBlocks];
+        for (int i = 0; i < numBlocks; i++) {
+          long id = dataIn.readLong();
+          long length = dataIn.readLong();
+          blocks[i] = new Block(id, length);
+        }
+        in.close();
+        return new INode(fileType, blocks);
+      default:
+        throw new IllegalArgumentException("Cannot deserialize inode.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
new file mode 100644
index 0000000..40decc2
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.fs.s3.FileSystemStore;
+import org.apache.hadoop.fs.s3.INode;
+import org.apache.tajo.common.exception.NotImplementedException;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+/**
+ * A stub implementation of {@link FileSystemStore} for testing
+ * {@link S3FileSystem} without actually connecting to S3.
+ */
+public class InMemoryFileSystemStore implements FileSystemStore {
+
+  private Configuration conf;
+  private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
+  private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
+
+  @Override
+  public void initialize(URI uri, Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String getVersion() throws IOException {
+    return "0";
+  }
+
+  @Override
+  public void deleteINode(Path path) throws IOException {
+    inodes.remove(normalize(path));
+  }
+
+  @Override
+  public void deleteBlock(Block block) throws IOException {
+    blocks.remove(block.getId());
+  }
+
+  @Override
+  public boolean inodeExists(Path path) throws IOException {
+    return inodes.containsKey(normalize(path));
+  }
+
+  @Override
+  public boolean blockExists(long blockId) throws IOException {
+    return blocks.containsKey(blockId);
+  }
+
+  @Override
+  public INode retrieveINode(Path path) throws IOException {
+    return inodes.get(normalize(path));
+  }
+
+  @Override
+  public File retrieveBlock(Block block, long byteRangeStart) throws IOException {
+    byte[] data = blocks.get(block.getId());
+    File file = createTempFile();
+    BufferedOutputStream out = null;
+    try {
+      out = new BufferedOutputStream(new FileOutputStream(file));
+      out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+    return file;
+  }
+
+  private File createTempFile() throws IOException {
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("test-", ".tmp", dir);
+    result.deleteOnExit();
+    return result;
+  }
+
+  @Override
+  public Set<Path> listSubPaths(Path path) throws IOException {
+    Path normalizedPath = normalize(path);
+    // This is inefficient but more than adequate for testing purposes.
+    Set<Path> subPaths = new LinkedHashSet<Path>();
+    for (Path p : inodes.tailMap(normalizedPath).keySet()) {
+      if (normalizedPath.equals(p.getParent())) {
+        subPaths.add(p);
+      }
+    }
+    return subPaths;
+  }
+
+  @Override
+  public Set<Path> listDeepSubPaths(Path path) throws IOException {
+    Path normalizedPath = normalize(path);
+    String pathString = normalizedPath.toUri().getPath();
+    if (!pathString.endsWith("/")) {
+      pathString += "/";
+    }
+    // This is inefficient but more than adequate for testing purposes.
+    Set<Path> subPaths = new LinkedHashSet<Path>();
+    for (Path p : inodes.tailMap(normalizedPath).keySet()) {
+      if (p.toUri().getPath().startsWith(pathString)) {
+        subPaths.add(p);
+      }
+    }
+    return subPaths;
+  }
+
+  @Override
+  public void storeINode(Path path, INode inode) throws IOException {
+    inodes.put(normalize(path), inode);
+  }
+
+  @Override
+  public void storeBlock(Block block, File file) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    byte[] buf = new byte[8192];
+    int numRead;
+    BufferedInputStream in = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(file));
+      while ((numRead = in.read(buf)) >= 0) {
+        out.write(buf, 0, numRead);
+      }
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+    blocks.put(block.getId(), out.toByteArray());
+  }
+
+  private Path normalize(Path path) {
+    if (!path.isAbsolute()) {
+      throw new IllegalArgumentException("Path must be absolute: " + path);
+    }
+    return new Path(path.toUri().getPath());
+  }
+
+  @Override
+  public void purge() throws IOException {
+    inodes.clear();
+    blocks.clear();
+  }
+
+  @Override
+  public void dump() throws IOException {
+    throw new NotImplementedException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
new file mode 100644
index 0000000..d4034b9
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.fs.s3.FileSystemStore;
+import org.apache.hadoop.fs.s3.INode;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3OutputStream extends OutputStream {
+
+  private Configuration conf;
+
+  private int bufferSize;
+
+  private FileSystemStore store;
+
+  private Path path;
+
+  private long blockSize;
+
+  private File backupFile;
+
+  private OutputStream backupStream;
+
+  private Random r = new Random();
+
+  private boolean closed;
+
+  private int pos = 0;
+
+  private long filePos = 0;
+
+  private int bytesWrittenToBlock = 0;
+
+  private byte[] outBuf;
+
+  private List<Block> blocks = new ArrayList<Block>();
+
+  private Block nextBlock;
+
+  private static final Log LOG =
+      LogFactory.getLog(S3OutputStream.class.getName());
+
+
+  public S3OutputStream(Configuration conf, FileSystemStore store,
+                        Path path, long blockSize, Progressable progress,
+                        int buffersize) throws IOException {
+
+    this.conf = conf;
+    this.store = store;
+    this.path = path;
+    this.blockSize = blockSize;
+    this.backupFile = newBackupFile();
+    this.backupStream = new FileOutputStream(backupFile);
+    this.bufferSize = buffersize;
+    this.outBuf = new byte[bufferSize];
+
+  }
+
+  private File newBackupFile() throws IOException {
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("output-", ".tmp", dir);
+    result.deleteOnExit();
+    return result;
+  }
+
+  public long getPos() throws IOException {
+    return filePos;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
+      flush();
+    }
+    outBuf[pos++] = (byte) b;
+    filePos++;
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    while (len > 0) {
+      int remaining = bufferSize - pos;
+      int toWrite = Math.min(remaining, len);
+      System.arraycopy(b, off, outBuf, pos, toWrite);
+      pos += toWrite;
+      off += toWrite;
+      len -= toWrite;
+      filePos += toWrite;
+
+      if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if (bytesWrittenToBlock + pos >= blockSize) {
+      flushData((int) blockSize - bytesWrittenToBlock);
+    }
+    if (bytesWrittenToBlock == blockSize) {
+      endBlock();
+    }
+    flushData(pos);
+  }
+
+  private synchronized void flushData(int maxPos) throws IOException {
+    int workingPos = Math.min(pos, maxPos);
+
+    if (workingPos > 0) {
+      //
+      // To the local block backup, write just the bytes
+      //
+      backupStream.write(outBuf, 0, workingPos);
+
+      //
+      // Track position
+      //
+      bytesWrittenToBlock += workingPos;
+      System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
+      pos -= workingPos;
+    }
+  }
+
+  private synchronized void endBlock() throws IOException {
+    //
+    // Done with local copy
+    //
+    backupStream.close();
+
+    //
+    // Send it to S3
+    //
+    // TODO: Use passed in Progressable to report progress.
+    nextBlockOutputStream();
+    store.storeBlock(nextBlock, backupFile);
+    Block[] arr = new Block[blocks.size()];
+    arr = blocks.toArray(arr);
+    store.storeINode(path, new INode(INode.FILE_TYPES[1], arr));
+
+    //
+    // Delete local backup, start new one
+    //
+    boolean b = backupFile.delete();
+    if (!b) {
+      LOG.warn("Ignoring failed delete");
+    }
+    backupFile = newBackupFile();
+    backupStream = new FileOutputStream(backupFile);
+    bytesWrittenToBlock = 0;
+  }
+
+  private synchronized void nextBlockOutputStream() throws IOException {
+    long blockId = r.nextLong();
+    while (store.blockExists(blockId)) {
+      blockId = r.nextLong();
+    }
+    nextBlock = new Block(blockId, bytesWrittenToBlock);
+    blocks.add(nextBlock);
+    bytesWrittenToBlock = 0;
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    flush();
+    if (filePos == 0 || bytesWrittenToBlock != 0) {
+      endBlock();
+    }
+
+    backupStream.close();
+    boolean b = backupFile.delete();
+    if (!b) {
+      LOG.warn("Ignoring failed delete");
+    }
+
+    super.close();
+
+    closed = true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
new file mode 100644
index 0000000..1c86489
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
@@ -0,0 +1,296 @@
+package org.apache.tajo.storage.s3;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3.Block;
+import org.apache.hadoop.fs.s3.FileSystemStore;
+import org.apache.hadoop.fs.s3.INode;
+import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SmallBlockS3FileSystem extends S3FileSystem {
+
+  private URI uri;
+
+  private FileSystemStore store;
+
+  private Path workingDir;
+
+  static class Holder {
+    private static InMemoryFileSystemStore s;
+
+    public synchronized static FileSystemStore get() {
+      if(s != null) {
+        return s;
+      }
+      s = new InMemoryFileSystemStore();
+      return s;
+    }
+
+    public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore)
{
+      s = inMemoryFileSystemStore;
+    }
+  }
+
+  public SmallBlockS3FileSystem() {
+  }
+
+
+  public SmallBlockS3FileSystem(
+      InMemoryFileSystemStore inMemoryFileSystemStore) {
+    Holder.set(inMemoryFileSystemStore);
+    this.store = inMemoryFileSystemStore;
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+  @Override
+  public long getDefaultBlockSize() {
+    return 10;
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    if (store == null) {
+      store = Holder.get();
+    }
+    store.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+    this.workingDir =
+        new Path("/user", System.getProperty("user.name")).makeQualified(this);
+  }
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(path));
+    if (inode == null) {
+      return false;
+    }
+    return inode.isFile();
+  }
+
+  private INode checkFile(Path path) throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(path));
+    if (inode == null) {
+      throw new IOException("No such file.");
+    }
+    if (inode.isDirectory()) {
+      throw new IOException("Path " + path + " is a directory.");
+    }
+    return inode;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    Path absolutePath = makeAbsolute(f);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      throw new FileNotFoundException("File " + f + " does not exist.");
+    }
+    if (inode.isFile()) {
+      return new FileStatus[] {
+          new S3FileStatus(f.makeQualified(this), inode)
+      };
+    }
+    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
+    for (Path p : store.listSubPaths(absolutePath)) {
+      ret.add(getFileStatus(p.makeQualified(this)));
+    }
+    return ret.toArray(new FileStatus[0]);
+  }
+  @Override
+  public FSDataOutputStream create(Path file, FsPermission permission,
+                                   boolean overwrite, int bufferSize,
+                                   short replication, long blockSize, Progressable progress)
+      throws IOException {
+
+    INode inode = store.retrieveINode(makeAbsolute(file));
+    if (inode != null) {
+      if (overwrite) {
+        delete(file, true);
+      } else {
+        throw new IOException("File already exists: " + file);
+      }
+    } else {
+      Path parent = file.getParent();
+      if (parent != null) {
+        if (!mkdirs(parent)) {
+          throw new IOException("Mkdirs failed to create " + parent.toString());
+        }
+      }
+    }
+    return new FSDataOutputStream
+        (new S3OutputStream(getConf(), store, makeAbsolute(file),
+            blockSize, progress, bufferSize),
+            statistics);
+  }
+  @Override
+  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    List<Path> paths = new ArrayList<Path>();
+    do {
+      paths.add(0, absolutePath);
+      absolutePath = absolutePath.getParent();
+    } while (absolutePath != null);
+
+    boolean result = true;
+    for (Path p : paths) {
+      result &= mkdir(p);
+    }
+    return result;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    Path absoluteSrc = makeAbsolute(src);
+    INode srcINode = store.retrieveINode(absoluteSrc);
+    if (srcINode == null) {
+      // src path doesn't exist
+      return false;
+    }
+    Path absoluteDst = makeAbsolute(dst);
+    INode dstINode = store.retrieveINode(absoluteDst);
+    if (dstINode != null && dstINode.isDirectory()) {
+      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
+      dstINode = store.retrieveINode(absoluteDst);
+    }
+    if (dstINode != null) {
+      // dst path already exists - can't overwrite
+      return false;
+    }
+    Path dstParent = absoluteDst.getParent();
+    if (dstParent != null) {
+      INode dstParentINode = store.retrieveINode(dstParent);
+      if (dstParentINode == null || dstParentINode.isFile()) {
+        // dst parent doesn't exist or is a file
+        return false;
+      }
+    }
+    return renameRecursive(absoluteSrc, absoluteDst);
+  }
+
+  private boolean renameRecursive(Path src, Path dst) throws IOException {
+    INode srcINode = store.retrieveINode(src);
+    store.storeINode(dst, srcINode);
+    store.deleteINode(src);
+    if (srcINode.isDirectory()) {
+      for (Path oldSrc : store.listDeepSubPaths(src)) {
+        INode inode = store.retrieveINode(oldSrc);
+        if (inode == null) {
+          return false;
+        }
+        String oldSrcPath = oldSrc.toUri().getPath();
+        String srcPath = src.toUri().getPath();
+        String dstPath = dst.toUri().getPath();
+        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
+        store.storeINode(newDst, inode);
+        store.deleteINode(oldSrc);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean delete(Path path, boolean recursive) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      return false;
+    }
+    if (inode.isFile()) {
+      store.deleteINode(absolutePath);
+      for (Block block: inode.getBlocks()) {
+        store.deleteBlock(block);
+      }
+    } else {
+      FileStatus[] contents = null;
+      try {
+        contents = listStatus(absolutePath);
+      } catch(FileNotFoundException fnfe) {
+        return false;
+      }
+
+      if ((contents.length !=0) && (!recursive)) {
+        throw new IOException("Directory " + path.toString()
+            + " is not empty.");
+      }
+      for (FileStatus p:contents) {
+        if (!delete(p.getPath(), recursive)) {
+          return false;
+        }
+      }
+      store.deleteINode(absolutePath);
+    }
+    return true;
+  }
+
+  /**
+   * FileStatus for S3 file systems.
+   */
+  @Override
+  public FileStatus getFileStatus(Path f)  throws IOException {
+    INode inode = store.retrieveINode(makeAbsolute(f));
+    if (inode == null) {
+      throw new FileNotFoundException(f + ": No such file or directory.");
+    }
+    return new S3FileStatus(f.makeQualified(this), inode);
+  }
+  private boolean mkdir(Path path) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    INode inode = store.retrieveINode(absolutePath);
+    if (inode == null) {
+      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
+    } else if (inode.isFile()) {
+      throw new IOException(String.format(
+          "Can't make directory for path %s since it is a file.",
+          absolutePath));
+    }
+    return true;
+  }
+  private Path makeAbsolute(Path path) {
+    if (path.isAbsolute()) {
+      return path;
+    }
+    return new Path(workingDir, path);
+  }
+
+  private static class S3FileStatus extends FileStatus {
+
+    S3FileStatus(Path f, INode inode) throws IOException {
+      super(findLength(inode), inode.isDirectory(), 1,
+          findBlocksize(inode), 0, f);
+    }
+
+    private static long findLength(INode inode) {
+      if (!inode.isDirectory()) {
+        long length = 0L;
+        for (Block block : inode.getBlocks()) {
+          length += block.getLength();
+        }
+        return length;
+      }
+      return 0;
+    }
+
+    private static long findBlocksize(INode inode) {
+      final Block[] ret = inode.getBlocks();
+      return ret == null ? 0L : ret[0].getLength();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f0103486/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
index 5bf4453..d825c4b 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -37,6 +37,26 @@
     <description></description>
   </property>
 
+  <property>
+    <name>fs.block.size</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <name>fs.local.block.size</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <name>tajo.min.split.size</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <name>fs.s3.impl</name>
+    <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+  </property>
+  
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>


Mime
View raw message