carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [02/50] [abbrv] carbondata git commit: [CARBONDATA-1827] S3 Carbon Implementation
Date Sun, 04 Mar 2018 12:33:01 GMT
[CARBONDATA-1827] S3 Carbon Implementation

1.Provide support for s3 in carbondata.
2.Added S3Example to create carbon table on s3.
3.Added S3CSVExample to load carbon table using csv from s3.

This closes #1805


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9086a1b9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9086a1b9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9086a1b9

Branch: refs/heads/carbonstore-rebase5
Commit: 9086a1b9f2cd6cf1d4d42290a4e3678b01472714
Parents: 0c75ab7
Author: SangeetaGulia <sangeeta.gulia@knoldus.in>
Authored: Thu Sep 21 14:56:26 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sun Mar 4 20:30:31 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  21 +++
 .../filesystem/AbstractDFSCarbonFile.java       |  20 ++-
 .../datastore/filesystem/HDFSCarbonFile.java    |   5 +-
 .../core/datastore/impl/FileFactory.java        |  11 +-
 .../core/locks/CarbonLockFactory.java           |  28 ++--
 .../carbondata/core/locks/S3FileLock.java       | 111 +++++++++++++
 .../carbondata/core/util/CarbonProperties.java  |   3 +-
 .../filesystem/HDFSCarbonFileTest.java          |   8 +-
 examples/spark2/pom.xml                         |   5 +
 examples/spark2/src/main/resources/data1.csv    |  11 ++
 .../carbondata/examples/S3CsvExample.scala      |  99 +++++++++++
 .../apache/carbondata/examples/S3Example.scala  | 164 +++++++++++++++++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  42 ++++-
 integration/spark2/pom.xml                      |  43 +++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |   3 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   3 +
 16 files changed, 554 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b2a3375..af3ed99 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -167,6 +167,22 @@ public final class CarbonCommonConstants {
   public static final String S3N_PREFIX = "s3n://";
 
   public static final String S3A_PREFIX = "s3a://";
+  /**
+   * Access Key for s3n
+   */
+  public static final String S3N_ACCESS_KEY = "fs.s3n.awsAccessKeyId";
+  /**
+   * Secret Key for s3n
+   */
+  public static final String S3N_SECRET_KEY = "fs.s3n.awsSecretAccessKey";
+  /**
+   * Access Key for s3
+   */
+  public static final String S3_ACCESS_KEY = "fs.s3.awsAccessKeyId";
+  /**
+   * Secret Key for s3
+   */
+  public static final String S3_SECRET_KEY = "fs.s3.awsSecretAccessKey";
 
   /**
    * FS_DEFAULT_FS
@@ -937,6 +953,11 @@ public final class CarbonCommonConstants {
   public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK";
 
   /**
+   * S3LOCK TYPE
+   */
+  public static final String CARBON_LOCK_TYPE_S3 = "S3LOCK";
+
+  /**
    * Invalid filter member log string
    */
   public static final String FILTER_INVALID_MEMBER =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 68eaa21..fd5dc40 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.Lz4Codec;
 import org.apache.hadoop.io.compress.SnappyCodec;
 
-public abstract  class AbstractDFSCarbonFile implements CarbonFile {
+public abstract class AbstractDFSCarbonFile implements CarbonFile {
   /**
    * LOGGER
    */
@@ -262,18 +262,28 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
   @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType
fileType,
       int bufferSize, boolean append) throws IOException {
     Path pt = new Path(path);
-    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    FileSystem fileSystem = pt.getFileSystem(FileFactory.getConfiguration());
     FSDataOutputStream stream = null;
     if (append) {
       // append to a file only if file already exists else file not found
       // exception will be thrown by hdfs
       if (CarbonUtil.isFileExists(path)) {
-        stream = fs.append(pt, bufferSize);
+        if (FileFactory.FileType.S3 == fileType) {
+          DataInputStream dataInputStream = fileSystem.open(pt);
+          int count = dataInputStream.available();
+          // create buffer
+          byte[] byteStreamBuffer = new byte[count];
+          dataInputStream.read(byteStreamBuffer);
+          stream = fileSystem.create(pt, true, bufferSize);
+          stream.write(byteStreamBuffer);
+        } else {
+          stream = fileSystem.append(pt, bufferSize);
+        }
       } else {
-        stream = fs.create(pt, true, bufferSize);
+        stream = fileSystem.create(pt, true, bufferSize);
       }
     } else {
-      stream = fs.create(pt, true, bufferSize);
+      stream = fileSystem.create(pt, true, bufferSize);
     }
     return stream;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
index d470b47..892a556 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
@@ -107,8 +107,11 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile {
         ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
             org.apache.hadoop.fs.Options.Rename.OVERWRITE);
         return true;
+      } else if (fileStatus.getPath().toString().startsWith("s3n")) {
+        fs.delete(new Path(changetoName), true);
+        return fs.rename(fileStatus.getPath(), new Path(changetoName));
       } else {
-        return false;
+        return fs.rename(fileStatus.getPath(), new Path(changetoName));
       }
     } catch (IOException e) {
       LOGGER.error("Exception occured: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 38ed2b7..f141991 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -246,7 +246,15 @@ public final class FileFactory {
    */
   public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType)
       throws IOException {
-    return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
+    if (FileType.S3 == fileType) {
+      CarbonFile carbonFile = getCarbonFile(path);
+      if (carbonFile.exists()) {
+        carbonFile.delete();
+      }
+      return carbonFile.getDataOutputStream(path,fileType);
+    } else {
+      return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
+    }
   }
 
   /**
@@ -423,6 +431,7 @@ public final class FileFactory {
       throws IOException {
     FileFactory.FileType fileType = FileFactory.getFileType(directoryPath);
     switch (fileType) {
+      case S3:
       case HDFS:
       case VIEWFS:
         try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
index e70e655..3226a63 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
@@ -52,18 +52,21 @@ public class CarbonLockFactory {
    */
   public static ICarbonLock getCarbonLockObj(AbsoluteTableIdentifier absoluteTableIdentifier,
       String lockFile) {
-    switch (lockTypeConfigured) {
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
-        return new LocalFileLock(absoluteTableIdentifier, lockFile);
 
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
-        return new ZooKeeperLocking(absoluteTableIdentifier, lockFile);
-
-      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
-        return new HdfsFileLock(absoluteTableIdentifier, lockFile);
-
-      default:
-        throw new UnsupportedOperationException("Not supported the lock type");
+    String tablePath = absoluteTableIdentifier.getTablePath();
+    if (lockTypeConfigured.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) {
+      return new ZooKeeperLocking(absoluteTableIdentifier, lockFile);
+    } else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
+        tablePath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
+            tablePath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_S3;
+      return new S3FileLock(absoluteTableIdentifier, lockFile);
+    } else if (tablePath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+      lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS;
+      return new HdfsFileLock(absoluteTableIdentifier, lockFile);
+    } else {
+      lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL;
+      return new LocalFileLock(absoluteTableIdentifier, lockFile);
     }
   }
 
@@ -84,6 +87,9 @@ public class CarbonLockFactory {
       case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
         return new HdfsFileLock(locFileLocation, lockFile);
 
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_S3:
+        return new S3FileLock(locFileLocation, lockFile);
+
       default:
         throw new UnsupportedOperationException("Not supported the lock type");
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
new file mode 100644
index 0000000..8836960
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.locks;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * This class is used to handle the S3 File locking.
+ * This is acheived using the concept of acquiring the data out stream using Append option.
+ */
+public class S3FileLock extends AbstractCarbonLock {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(S3FileLock.class.getName());
+  /**
+   * location s3 file location
+   */
+  private String location;
+
+  private DataOutputStream dataOutputStream;
+
+  /**
+   * @param tableIdentifier
+   * @param lockFile
+   */
+  public S3FileLock(AbsoluteTableIdentifier tableIdentifier, String lockFile) {
+    this(tableIdentifier.getTablePath(), lockFile);
+  }
+
+  /**
+   * @param lockFileLocation
+   * @param lockFile
+   */
+  public S3FileLock(String lockFileLocation, String lockFile) {
+    this.location = lockFileLocation + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+    LOGGER.info("S3 lock path:" + this.location);
+    initRetry();
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.carbondata.core.locks.ICarbonLock#unlock()
+   */
+  @Override public boolean unlock() {
+    boolean status = false;
+    if (null != dataOutputStream) {
+      try {
+        dataOutputStream.close();
+        status = true;
+      } catch (IOException e) {
+        status = false;
+      } finally {
+        CarbonFile carbonFile =
+            FileFactory.getCarbonFile(location, FileFactory.getFileType(location));
+        if (carbonFile.exists()) {
+          if (carbonFile.delete()) {
+            LOGGER.info("Deleted the lock file " + location);
+          } else {
+            LOGGER.error("Not able to delete the lock file " + location);
+            status = false;
+          }
+        } else {
+          LOGGER.error(
+              "Not able to delete the lock file because it is not existed in location " +
location);
+          status = false;
+        }
+      }
+    }
+    return status;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.carbondata.core.locks.ICarbonLock#lock()
+   */
+  @Override public boolean lock() {
+    try {
+      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
+        FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+      }
+      dataOutputStream =
+          FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
+      return true;
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+      return false;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 667c45c..8b81c1e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -369,7 +369,8 @@ public final class CarbonProperties {
     String defaultFs = configuration.get("fs.defaultFS");
     if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
         || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs
-        .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX))
+        .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) || defaultFs
+        .startsWith(CarbonCommonConstants.S3A_PREFIX))
         && !CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS.equalsIgnoreCase(lockTypeConfigured))
{
       LOGGER.warn("The value \"" + lockTypeConfigured + "\" configured for key "
           + LOCK_TYPE + " is invalid for current file system. "

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
index 7726693..4018123 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
@@ -369,7 +369,13 @@ public class HDFSCarbonFileTest {
             }
 
         };
-        assertEquals(hdfsCarbonFile.renameForce(fileName), false);
+        new MockUp<WebHdfsFileSystem>(){
+            @Mock
+            public boolean rename(final Path src, final Path dst) throws IOException {
+                return true;
+            }
+        };
+        assertEquals(hdfsCarbonFile.renameForce(fileName), true);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index c17f0ee..f64dc9f 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -62,6 +62,11 @@
       <version>${spark.version}</version>
       <scope>${spark.deps.scope}</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.2</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/examples/spark2/src/main/resources/data1.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data1.csv b/examples/spark2/src/main/resources/data1.csv
new file mode 100644
index 0000000..cf732eb
--- /dev/null
+++ b/examples/spark2/src/main/resources/data1.csv
@@ -0,0 +1,11 @@
+shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField
+1,10,1100,48.4,spark,2015-4-23 12:01:01,1.23,2015-4-23,aaa,2.5
+5,17,1140,43.4,spark,2015-7-27 12:01:02,3.45,2015-7-27,bbb,2.5
+1,11,1100,44.4,flink,2015-5-23 12:01:03,23.23,2015-5-23,ccc,2.5
+1,10,1150,43.4,spark,2015-7-24 12:01:04,254.12,2015-7-24,ddd,2.5
+1,10,1100,47.4,spark,2015-7-23 12:01:05,876.14,2015-7-23,eeee,3.5
+3,14,1160,43.4,hive,2015-7-26 12:01:06,3454.32,2015-7-26,ff,2.5
+2,10,1100,43.4,impala,2015-7-23 12:01:07,456.98,2015-7-23,ggg,2.5
+1,10,1100,43.4,spark,2015-5-23 12:01:08,32.53,2015-5-23,hhh,2.5
+4,16,1130,42.4,impala,2015-7-23 12:01:09,67.23,2015-7-23,iii,2.5
+1,10,1100,43.4,spark,2015-7-23 12:01:10,832.23,2015-7-23,jjj,2.5

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
new file mode 100644
index 0000000..b37fba8
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, SECRET_KEY}
+import org.apache.spark.sql.SparkSession
+import org.slf4j.{Logger, LoggerFactory}
+
+object S3CsvExample {
+
+  /**
+   * This example demonstrate to create local store and load data from CSV files on S3
+   *
+   * @param args require three parameters "Access-key" "Secret-key"
+   *             "s3 path to csv" "spark-master"
+   */
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+    import org.apache.spark.sql.CarbonSession._
+    if (args.length != 4) {
+      logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
+                   "<s3.csv.location> <spark-master>")
+      System.exit(0)
+    }
+
+    val spark = SparkSession
+      .builder()
+      .master(args(3))
+      .appName("S3CsvExample")
+      .config("spark.driver.host", "localhost")
+      .config("spark.hadoop." + ACCESS_KEY, args(0))
+      .config("spark.hadoop." + SECRET_KEY, args(1))
+      .getOrCreateCarbonSession()
+
+    spark.sparkContext.setLogLevel("INFO")
+
+    spark.sql(
+      s"""
+         | CREATE TABLE if not exists carbon_table1(
+         | shortField SHORT,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | LOCATION '$rootPath/examples/spark2/target/store'
+         | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${ args(2) }'
+         | INTO TABLE carbon_table1
+         | OPTIONS('HEADER'='true')
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${ args(2) }'
+         | INTO TABLE carbon_table1
+         | OPTIONS('HEADER'='true')
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | SELECT *
+         | FROM carbon_table1
+      """.stripMargin).show()
+
+    spark.sql("Drop table if exists carbon_table1")
+
+    spark.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
new file mode 100644
index 0000000..d3d0a37
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+object S3Example {
+
+  /**
+   * This example demonstrate usage of
+   * 1. create carbon table with storage location on object based storage
+   * like AWS S3, Huawei OBS, etc
+   * 2. load data into carbon table, the generated file will be stored on object based storage
+   * query the table.
+   *
+   * @param args require three parameters "Access-key" "Secret-key"
+   *             "table-path on s3" "s3-endpoint" "spark-master"
+   */
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv"
+    val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+    import org.apache.spark.sql.CarbonSession._
+    if (args.length < 3 || args.length > 5) {
+      logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
+                   "<table-path-on-s3> [s3-endpoint] [spark-master]")
+      System.exit(0)
+    }
+
+    val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2))
+    val spark = SparkSession
+      .builder()
+      .master(getSparkMaster(args))
+      .appName("S3Example")
+      .config("spark.driver.host", "localhost")
+      .config(accessKey, args(0))
+      .config(secretKey, args(1))
+      .config(endpoint, getS3EndPoint(args))
+      .getOrCreateCarbonSession()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    spark.sql("Drop table if exists carbon_table")
+
+    spark.sql(
+      s"""
+         | CREATE TABLE if not exists carbon_table(
+         | shortField SHORT,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | LOCATION '${ args(2) }'
+         | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | OPTIONS('HEADER'='true')
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | SELECT *
+         | FROM carbon_table
+      """.stripMargin).show()
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | OPTIONS('HEADER'='true')
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | OPTIONS('HEADER'='true')
+       """.stripMargin)
+
+    val countSegment: Array[Row] =
+      spark.sql(
+        s"""
+           | SHOW SEGMENTS FOR TABLE carbon_table
+       """.stripMargin).collect()
+
+    while (countSegment.length != 3) {
+      this.wait(2000)
+    }
+
+    // Use compaction command to merge segments or small files in object based storage,
+    // this can be done periodically.
+    spark.sql("ALTER table carbon_table compact 'MAJOR'")
+    spark.sql("show segments for table carbon_table").show()
+
+    spark.sql(
+      s"""
+         | SELECT *
+         | FROM carbon_table
+      """.stripMargin).show()
+
+    spark.sql("Drop table if exists carbon_table")
+
+    spark.stop()
+  }
+
+  def getKeyOnPrefix(path: String): (String, String, String) = {
+    val endPoint = "spark.hadoop." + ENDPOINT
+    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
+      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
+        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
+        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
+    } else {
+      throw new Exception("Incorrect Store Path")
+    }
+  }
+
+  def getS3EndPoint(args: Array[String]): String = {
+    if (args.length >= 4 && args(3).contains(".com")) args(3)
+    else ""
+  }
+
+  def getSparkMaster(args: Array[String]): String = {
+      if (args.length == 5) args(4)
+      else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
+      else "local"
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 1fa1689..917fc88 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -346,11 +347,31 @@ class NewDataFrameLoaderRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
+    prev: DataLoadCoalescedRDD[Row],
+    @transient hadoopConf: Configuration) extends CarbonRDD[(K, V)](prev) {
 
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)]
= {
+  private val confBytes = {
+    val bao = new ByteArrayOutputStream()
+    val oos = new ObjectOutputStream(bao)
+    hadoopConf.write(oos)
+    oos.close()
+    CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
+  }
 
+  private def getConf = {
+    val configuration = new Configuration(false)
+    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
+      .unCompressByte(confBytes))
+    val ois = new ObjectInputStream(bai)
+    configuration.readFields(ois)
+    ois.close()
+    configuration
+  }
+
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)]
= {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val hadoopConf = getConf
+    setS3Configurations(hadoopConf)
     val iter = new Iterator[(K, V)] {
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
@@ -420,6 +441,23 @@ class NewDataFrameLoaderRDD[K, V](
     iter
   }
   override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+
+  private def setS3Configurations(hadoopConf: Configuration): Unit = {
+    FileFactory.getConfiguration
+      .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""))
+    FileFactory.getConfiguration
+      .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""))
+    FileFactory.getConfiguration
+      .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""))
+    FileFactory.getConfiguration.set(CarbonCommonConstants.S3_ACCESS_KEY,
+      hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""))
+    FileFactory.getConfiguration.set(CarbonCommonConstants.S3_SECRET_KEY,
+      hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""))
+    FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_ACCESS_KEY,
+      hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""))
+    FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_SECRET_KEY,
+     hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""))
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 7d1d5bb..73fc3eb 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -63,6 +63,49 @@
       <version>2.2.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>1.7.4</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>net.java.dev.jets3t</groupId>
+      <artifactId>jets3t</artifactId>
+      <version>0.9.0</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 328ce32..add038d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -979,7 +979,8 @@ object CarbonDataRDDFactory {
         sqlContext.sparkContext,
         new DataLoadResultImpl(),
         carbonLoadModel,
-        newRdd
+        newRdd,
+        sqlContext.sparkContext.hadoopConfiguration
       ).collect()
     } catch {
       case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9086a1b9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 0116d9e..935b0a6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -21,6 +21,7 @@ import java.io.File
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
@@ -30,6 +31,7 @@ import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
 
 /**
@@ -152,6 +154,7 @@ object CarbonSession {
             sparkConf.setAppName(randomAppName)
           }
           val sc = SparkContext.getOrCreate(sparkConf)
+          setS3Configurations(sc)
           // maybe this is an existing SparkContext, update its SparkConf which maybe used
           // by SparkSession
           options.foreach { case (k, v) => sc.conf.set(k, v) }


Mime
View raw message