carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject carbondata git commit: [HOTFIX] support "carbon.load.directWriteHdfs.enabled" for S3
Date Thu, 27 Sep 2018 08:34:16 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 2a4f53001 -> 5d17ff40b


[HOTFIX] support "carbon.load.directWriteHdfs.enabled" for S3

problem : Currently for s3, when the above carbon property is set. index file will not be
written in the s3 store path due to bug in folder path.

Solution: file separator used is wrong. Need to fix it.
Also rename a carbon peroperty
"carbon.load.directWriteHdfs.enabled" to
"carbon.load.directWriteToStorePath.enabled"

This closes #2697


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5d17ff40
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5d17ff40
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5d17ff40

Branch: refs/heads/master
Commit: 5d17ff40bdeeba64a8885fa2df427fbdec6a38ea
Parents: 2a4f530
Author: ajantha-bhat <ajanthabhat@gmail.com>
Authored: Thu Sep 6 16:47:22 2018 +0530
Committer: kunal642 <kunalkapoor642@gmail.com>
Committed: Thu Sep 27 14:03:37 2018 +0530

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  6 ++--
 docs/configuration-parameters.md                |  2 +-
 .../carbondata/examples/sdk/SDKS3Example.java   | 12 +++++++
 .../dataload/TestLoadDataGeneral.scala          |  8 ++---
 .../store/writer/AbstractFactDataWriter.java    | 38 ++++++++++----------
 5 files changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 3eab69d..82485ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -136,9 +136,9 @@ public final class CarbonLoadOptionConstants {
   public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 
   @CarbonProperty
-  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
-      = "carbon.load.directWriteHdfs.enabled";
-  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = "false";
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH
+      = "carbon.load.directWriteToStorePath.enabled";
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT = "false";
 
   /**
    * If the sort memory is insufficient, spill inmemory pages to disk.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 662525b..9dd8164 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -90,7 +90,7 @@ This section provides the details of all the configurations required for
the Car
 | carbon.prefetch.buffersize | 1000 | When the configuration ***carbon.merge.sort.prefetch***
is configured to true, we need to set the number of records that can be prefetched.This configuration
is used specify the number of records to be prefetched.**NOTE: **Configuring more number of
records to be prefetched increases memory footprint as more records will have to be kept in
memory. |
 | load_min_size_inmb | 256 | This configuration is used along with ***carbon.load.min.size.enabled***.This
determines the minimum size of input files to be considered for distribution among executors
while data loading.**NOTE:** Refer to ***carbon.load.min.size.enabled*** for understanding
when this configuration needs to be used and its advantages and disadvantages. |
 | carbon.load.sortmemory.spill.percentage | 0 | During data loading, some data pages are
kept in memory upto memory configured in ***carbon.sort.storage.inmemory.size.inmb*** beyond
which they are spilled to disk as intermediate temporary sort files.This configuration determines
after what percentage data needs to be spilled to disk.**NOTE:** Without this configuration,
when the data pages occupy upto configured memory, new data pages would be dumped to disk
and old pages are still maintained in disk. |
-| carbon.load.directWriteHdfs.enabled | false | During data load all the carbondata files
are written to local disk and finally copied to the target location in HDFS.Enabling this
parameter will make carrbondata files to be written directly onto target HDFS location bypassing
the local disk.**NOTE:** Writing directly to HDFS saves local disk IO(once for writing the
files and again for copying to HDFS) there by improving the performance.But the drawback is
when data loading fails or the application crashes, unwanted carbondata files will remain
in the target HDFS location until it is cleared during next data load or by running *CLEAN
FILES* DDL command |
+| carbon.load.directWriteToStorePath.enabled | false | During data load, all the carbondata
files are written to local disk and finally copied to the target store location in HDFS/S3.Enabling
this parameter will make carbondata files to be written directly onto target HDFS/S3 location
bypassing the local disk.**NOTE:** Writing directly to HDFS/S3 saves local disk IO(once for
writing the files and again for copying to HDFS/S3) there by improving the performance.But
the drawback is when data loading fails or the application crashes, unwanted carbondata files
will remain in the target HDFS/S3 location until it is cleared during next data load or by
running *CLEAN FILES* DDL command |
 | carbon.options.serialization.null.format | \N | Based on the business scenarios, some columns
might need to be loaded with null values.As null value cannot be written in csv files, some
special characters might be adopted to specify null values.This configuration can be used
to specify the null values format in the data being loaded. |
 | carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every ***carbon.sort.size***
number of records to intermediate temp files during data loading to ensure memory footprint
is within limits.When ***enable.unsafe.sort*** configuration is enabled, instead of using
***carbon.sort.size*** which is based on rows count, size occupied in memory is used to determine
when to flush data pages to intermediate temp files.This configuration determines the memory
to be used for storing data pages in memory.**NOTE:** Configuring a higher values ensures
more data is maintained in memory and hence increases data loading performance due to reduced
or no IO.Based on the memory availability in the nodes of the cluster, configure the values
accordingly. |
 | carbon.column.compressor | snappy | CarbonData will compress the column values using the
compressor specified by this configuration. Currently CarbonData supports 'snappy' and 'zstd'
compressors. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index d4f49f5..bc0e280 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -19,10 +19,12 @@ package org.apache.carbondata.examples.sdk;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.sdk.file.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +42,12 @@ public class SDKS3Example {
             System.exit(0);
         }
 
+        String backupProperty = CarbonProperties.getInstance()
+            .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+                CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
+        CarbonProperties.getInstance()
+            .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
"true");
+
         String path = "s3a://sdk/WriterOutput";
         if (args.length > 3) {
             path=args[3];
@@ -87,5 +95,9 @@ public class SDKS3Example {
         }
         System.out.println("\nFinished");
         reader.close();
+
+        CarbonProperties.getInstance()
+            .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+                backupProperty);
     }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 8b51090..02abb8d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -243,10 +243,10 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach
{
 
   test("test data loading with directly writing fact data to hdfs") {
     val originStatus = CarbonProperties.getInstance().getProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT)
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
     CarbonProperties.getInstance().addProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS, "true")
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")
 
     val testData = s"$resourcesPath/sample.csv"
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
@@ -256,7 +256,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     )
 
     CarbonProperties.getInstance().addProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
       originStatus)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 4afb3ef..37d33c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -66,9 +66,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
   protected WritableByteChannel fileChannel;
   protected long currentOffsetInFile;
   /**
-   * The path of CarbonData file to write in hdfs
+   * The path of CarbonData file to write in hdfs/s3
    */
-  private String carbonDataFileHdfsPath;
+  private String carbonDataFileStorePath;
   /**
    * The temp path of carbonData file used on executor
    */
@@ -145,9 +145,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
    */
   protected DataMapWriterListener listener;
   /**
-   * Whether directly write fact data to hdfs
+   * Whether directly write fact data to store path
    */
-  private boolean enableDirectlyWriteData2Hdfs = false;
+  private boolean enableDirectlyWriteDataToStorePath = false;
 
   protected ExecutorService fallbackExecutorService;
 
@@ -172,11 +172,11 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
 
     // whether to directly write fact data to HDFS
     String directlyWriteData2Hdfs = propInstance
-        .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
-            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
-    this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
+        .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
+    this.enableDirectlyWriteDataToStorePath = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
 
-    if (enableDirectlyWriteData2Hdfs) {
+    if (enableDirectlyWriteDataToStorePath) {
       LOGGER.info("Carbondata will directly write fact data to HDFS.");
     } else {
       LOGGER.info("Carbondata will write temporary fact data to local disk.");
@@ -225,7 +225,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
     if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize
!= 0) {
       // set the current file size to zero
       String activeFile =
-          enableDirectlyWriteData2Hdfs ? carbonDataFileHdfsPath : carbonDataFileTempPath;
+          enableDirectlyWriteDataToStorePath ? carbonDataFileStorePath : carbonDataFileTempPath;
       LOGGER.info("Writing data to file as max file size reached for file: "
           + activeFile + ". Data block size: " + currentFileSize);
       // write meta data to end of the existing file
@@ -269,7 +269,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
   protected void commitCurrentFile(boolean copyInCurrentThread) {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    if (!enableDirectlyWriteData2Hdfs) {
+    if (!enableDirectlyWriteDataToStorePath) {
       try {
         if (copyInCurrentThread) {
           CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
@@ -296,14 +296,14 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
         .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
             "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
-    this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
+    this.carbonDataFileStorePath = model.getCarbonDataDirectoryPath() + File.separator
         + carbonDataFileName;
     try {
-      if (enableDirectlyWriteData2Hdfs) {
+      if (enableDirectlyWriteDataToStorePath) {
         // the block size will be twice the block_size specified by user to make sure that
         // one carbondata file only consists exactly one HDFS block.
         fileOutputStream = FileFactory
-            .getDataOutputStream(carbonDataFileHdfsPath, FileFactory.FileType.HDFS,
+            .getDataOutputStream(carbonDataFileStorePath, FileFactory.FileType.HDFS,
                 CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2);
       } else {
         //each time we initialize writer, we choose a local temp location randomly
@@ -380,11 +380,11 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
     String indexFileName;
-    if (enableDirectlyWriteData2Hdfs) {
-      String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
-          .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
-              model.getBucketId(), model.getTaskExtension(),
-              "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
+    if (enableDirectlyWriteDataToStorePath) {
+      String rawFileName = model.getCarbonDataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonTablePath.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
+          model.getBucketId(), model.getTaskExtension(),
+          "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
       indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
     } else {
       // randomly choose a temp location for index file
@@ -407,7 +407,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
       writer.writeThrift(blockIndex);
     }
     writer.close();
-    if (!enableDirectlyWriteData2Hdfs) {
+    if (!enableDirectlyWriteDataToStorePath) {
       CarbonUtil
           .copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
               fileSizeInBytes);


Mime
View raw message