carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: CARBONDATA-367 Add support alluxio(tachyon) file system
Date Mon, 07 Nov 2016 02:24:58 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master d2041356d -> 6d4ffef69


CARBONDATA-367 Add support alluxio(tachyon) file system

CARBONDATA-367 Add support alluxio(tachyon) file system

Rename ALLUXIOCarbonFile.java to AlluxioCarbonFile.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6ee44644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6ee44644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6ee44644

Branch: refs/heads/master
Commit: 6ee44644071272ab07969f06d92b67661959afe4
Parents: d204135
Author: chenliang613 <chenliang613@apache.org>
Authored: Thu Nov 3 15:50:35 2016 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Mon Nov 7 07:53:22 2016 +0530

----------------------------------------------------------------------
 .../store/filesystem/AlluxioCarbonFile.java     | 130 +++++++++++++++++++
 .../datastorage/store/impl/FileFactory.java     |  39 +++++-
 .../apache/carbondata/core/util/CarbonUtil.java |   1 +
 .../carbondata/examples/AlluxioExample.scala    |  65 ++++++++++
 4 files changed, 230 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6ee44644/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
new file mode 100644
index 0000000..9ff0f59
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.store.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+
+
+public class AlluxioCarbonFile extends AbstractDFSCarbonFile {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AlluxioCarbonFile.class.getName());
+
+  public AlluxioCarbonFile(String filePath) {
+    super(filePath);
+  }
+
+  public AlluxioCarbonFile(Path path) {
+    super(path);
+  }
+
+  public AlluxioCarbonFile(FileStatus fileStatus) {
+    super(fileStatus);
+  }
+
+  /**
+   * @param listStatus
+   * @return
+   */
+  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+    if (listStatus == null) {
+      return new CarbonFile[0];
+    }
+    CarbonFile[] files = new CarbonFile[listStatus.length];
+    for (int i = 0; i < files.length; i++) {
+      files[i] = new AlluxioCarbonFile(listStatus[i]);
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile[] listFiles() {
+    FileStatus[] listStatus = null;
+    try {
+      if (null != fileStatus && fileStatus.isDirectory()) {
+        Path path = fileStatus.getPath();
+        listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+      } else {
+        return null;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return new CarbonFile[0];
+    }
+    return getFiles(listStatus);
+  }
+
+  @Override
+  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+    CarbonFile[] files = listFiles();
+    if (files != null && files.length >= 1) {
+      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+      for (int i = 0; i < files.length; i++) {
+        if (fileFilter.accept(files[i])) {
+          fileList.add(files[i]);
+        }
+      }
+      if (fileList.size() >= 1) {
+        return fileList.toArray(new CarbonFile[fileList.size()]);
+      } else {
+        return new CarbonFile[0];
+      }
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile getParentFile() {
+    Path parent = fileStatus.getPath().getParent();
+    return null == parent ? null : new AlluxioCarbonFile(parent);
+  }
+
+  @Override
+  public boolean renameForce(String changetoName) {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      if (fs instanceof DistributedFileSystem) {
+        ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
+            org.apache.hadoop.fs.Options.Rename.OVERWRITE);
+        return true;
+      } else {
+        return false;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6ee44644/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
index 684fe21..a94d3f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -31,6 +31,7 @@ import java.io.InputStream;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.filesystem.AlluxioCarbonFile;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
 import org.apache.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
@@ -57,6 +58,8 @@ public final class FileFactory {
     if (property != null) {
       if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
         storeDefaultFileType = FileType.HDFS;
+      } else if (property.startsWith(CarbonUtil.ALLUXIO_PREFIX)) {
+        storeDefaultFileType = FileType.ALLUXIO;
       } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
         storeDefaultFileType = FileType.VIEWFS;
       }
@@ -79,6 +82,7 @@ public final class FileFactory {
       case LOCAL:
         return new FileHolderImpl();
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         return new DFSFileHolderImpl();
       default:
@@ -91,7 +95,11 @@ public final class FileFactory {
     if (property != null) {
       if (property.startsWith(CarbonUtil.HDFS_PREFIX)) {
         storeDefaultFileType = FileType.HDFS;
-      } else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+      }
+      else if (property.startsWith(CarbonUtil.ALLUXIO_PREFIX)) {
+        storeDefaultFileType = FileType.ALLUXIO;
+      }
+      else if (property.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
         storeDefaultFileType = FileType.VIEWFS;
       }
     }
@@ -101,7 +109,11 @@ public final class FileFactory {
   public static FileType getFileType(String path) {
     if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
       return FileType.HDFS;
-    } else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+    }
+    else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) {
+      return FileType.ALLUXIO;
+    }
+    else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
       return FileType.VIEWFS;
     }
     return FileType.LOCAL;
@@ -113,6 +125,8 @@ public final class FileFactory {
         return new LocalCarbonFile(path);
       case HDFS:
         return new HDFSCarbonFile(path);
+      case ALLUXIO:
+        return new AlluxioCarbonFile(path);
       case VIEWFS:
         return new ViewFSCarbonFile(path);
       default:
@@ -142,9 +156,10 @@ public final class FileFactory {
         }
         break;
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
-        FileSystem fs = FileSystem.get(configuration);
+        FileSystem fs = pt.getFileSystem(configuration);
         if (bufferSize == -1) {
           stream = fs.open(pt);
         } else {
@@ -179,9 +194,10 @@ public final class FileFactory {
     path = path.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
-        FileSystem fs = FileSystem.get(configuration);
+        FileSystem fs = pt.getFileSystem(configuration);
         FSDataInputStream stream = fs.open(pt, bufferSize);
         stream.seek(offset);
         return new DataInputStream(new BufferedInputStream(stream));
@@ -204,6 +220,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
@@ -221,6 +238,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
@@ -239,6 +257,7 @@ public final class FileFactory {
         return new DataOutputStream(
             new BufferedOutputStream(new FileOutputStream(path), bufferSize));
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
@@ -258,6 +277,7 @@ public final class FileFactory {
         return new DataOutputStream(
             new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
@@ -288,6 +308,7 @@ public final class FileFactory {
         return new DataOutputStream(
             new BufferedOutputStream(new FileOutputStream(path), bufferSize));
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
@@ -313,6 +334,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
@@ -345,6 +367,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
@@ -361,6 +384,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
@@ -377,6 +401,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
@@ -403,6 +428,7 @@ public final class FileFactory {
       case LOCAL:
         return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true)));
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path pt = new Path(path);
         FileSystem fs = pt.getFileSystem(configuration);
@@ -426,6 +452,7 @@ public final class FileFactory {
     filePath = filePath.replace("\\", "/");
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);
@@ -442,7 +469,7 @@ public final class FileFactory {
   }
 
   public enum FileType {
-    LOCAL, HDFS, VIEWFS
+    LOCAL, HDFS, ALLUXIO, VIEWFS
   }
 
   /**
@@ -457,6 +484,7 @@ public final class FileFactory {
     FileType fileType = getFileType(filePath);
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         return filePath;
       case LOCAL:
@@ -478,6 +506,7 @@ public final class FileFactory {
     FileType fileType = getFileType(filePath);
     switch (fileType) {
       case HDFS:
+      case ALLUXIO:
       case VIEWFS:
         Path path = new Path(filePath);
         FileSystem fs = path.getFileSystem(configuration);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6ee44644/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8aceecd..e1a993a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -79,6 +79,7 @@ public final class CarbonUtil {
 
   public static final String HDFS_PREFIX = "hdfs://";
   public static final String VIEWFS_PREFIX = "viewfs://";
+  public static final String ALLUXIO_PREFIX = "alluxio://";
   private static final String FS_DEFAULT_FS = "fs.defaultFS";
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6ee44644/examples/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala
new file mode 100644
index 0000000..17fe960
--- /dev/null
+++ b/examples/src/main/scala/org/apache/carbondata/examples/AlluxioExample.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * configure alluxio:
+ * 1.start alluxio
+ * 2.upload the jar :"/alluxio_path/core/client/target/
+ * alluxio-core-client-YOUR-VERSION-jar-with-dependencies.jar"
+ * 3.Get more detail at:http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
+ */
+
+object AlluxioExample {
+  def main(args: Array[String]) {
+    val cc = ExampleUtils.createCarbonContext("AlluxioExample")
+    cc.sparkContext.hadoopConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")
+    FileFactory.getConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")
+
+    // Specify timestamp format based on raw data
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    cc.sql("DROP TABLE IF EXISTS t3")
+
+    cc.sql("""
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED BY 'carbondata'
+           """)
+
+    cc.sql(s"""
+           LOAD DATA LOCAL INPATH 'alluxio://localhost:19998/data.csv' into table t3
+           """)
+
+    cc.sql("""
+           SELECT country, count(salary) AS amount
+           FROM t3
+           WHERE country IN ('china','france')
+           GROUP BY country
+           """).show()
+
+    cc.sql("DROP TABLE IF EXISTS t3")
+  }
+}


Mime
View raw message