carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] carbondata git commit: resolved int, short type bug for hive , modified the hive example, resolved unsafeintermediatefilemerger datattype issue
Date Thu, 01 Jun 2017 15:28:54 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master ddb80f729 -> 50627c047


resolved int,short type bug for hive ,modified the hive example,resolved unsafeintermediatefilemerger
datattype issue


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb0405d6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb0405d6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb0405d6

Branch: refs/heads/master
Commit: eb0405d65fa9df3f7b1a994079b0ab3103a57484
Parents: ddb80f7
Author: anubhav100 <anubhav.tarar@knoldus.in>
Authored: Wed May 31 12:40:38 2017 +0530
Committer: chenliang613 <chenliang613@apache.org>
Committed: Thu Jun 1 23:27:55 2017 +0800

----------------------------------------------------------------------
 .../carbondata/hive/CarbonHiveRecordReader.java |  53 ++++----
 .../carbondata/hiveexample/HiveExample.scala    | 122 ++++++-------------
 .../merger/UnsafeIntermediateFileMerger.java    |   6 +
 3 files changed, 70 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb0405d6/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index e7e342c..add4baf 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.hive;
 
-
 import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -41,7 +40,11 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -61,7 +64,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
   private CarbonObjectInspector objInspector;
 
   public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable>
readSupport,
-                                InputSplit inputSplit, JobConf jobConf) throws IOException
{
+      InputSplit inputSplit, JobConf jobConf) throws IOException {
     super(queryModel, readSupport);
     initialize(inputSplit, jobConf);
   }
@@ -78,16 +81,16 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     }
     List<TableBlockInfo> tableBlockInfoList = CarbonHiveInputSplit.createBlocks(splitList);
     queryModel.setTableBlockInfos(tableBlockInfoList);
-    readSupport.initialize(queryModel.getProjectionColumns(),
-                           queryModel.getAbsoluteTableIdentifier());
+    readSupport
+        .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
     try {
       carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
     } catch (QueryExecutionException e) {
       throw new IOException(e.getMessage(), e.getCause());
     }
     if (valueObj == null) {
-      valueObj = new ArrayWritable(Writable.class,
-                                   new Writable[queryModel.getProjectionColumns().length]);
+      valueObj =
+          new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]);
     }
 
     final TypeInfo rowTypeInfo;
@@ -120,8 +123,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
   }
 
-  @Override
-  public boolean next(Void aVoid, ArrayWritable value) throws IOException {
+  @Override public boolean next(Void aVoid, ArrayWritable value) throws IOException {
     if (carbonIterator.hasNext()) {
       Object obj = readSupport.readRow(carbonIterator.next());
       ArrayWritable tmpValue = null;
@@ -138,11 +140,12 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
           System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
         } else {
           if (arrValue.length != arrCurrent.length) {
-            throw new IOException("CarbonHiveInput : size of object differs. Value" +
-              " size :  " + arrValue.length + ", Current Object size : " + arrCurrent.length);
+            throw new IOException(
+                "CarbonHiveInput : size of object differs. Value" + " size :  " + arrValue.length
+                    + ", Current Object size : " + arrCurrent.length);
           } else {
-            throw new IOException("CarbonHiveInput can not support RecordReaders that" +
-              " don't return same key & value & value is null");
+            throw new IOException("CarbonHiveInput can not support RecordReaders that"
+                + " don't return same key & value & value is null");
           }
         }
       }
@@ -156,23 +159,19 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     return createStruct(obj, objInspector);
   }
 
-  @Override
-  public Void createKey() {
+  @Override public Void createKey() {
     return null;
   }
 
-  @Override
-  public ArrayWritable createValue() {
+  @Override public ArrayWritable createValue() {
     return valueObj;
   }
 
-  @Override
-  public long getPos() throws IOException {
+  @Override public long getPos() throws IOException {
     return 0;
   }
 
-  @Override
-  public float getProgress() throws IOException {
+  @Override public float getProgress() throws IOException {
     return 0;
   }
 
@@ -190,7 +189,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
   }
 
   private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
-    throws SerDeException {
+      throws SerDeException {
     List sourceArray = inspector.getList(obj);
     ObjectInspector subInspector = inspector.getListElementObjectInspector();
     List array = new ArrayList();
@@ -208,7 +207,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
       ArrayWritable subArray = new ArrayWritable(((Writable) array.get(0)).getClass(),
           (Writable[]) array.toArray(new Writable[array.size()]));
 
-      return new ArrayWritable(Writable.class, new Writable[]{subArray});
+      return new ArrayWritable(Writable.class, new Writable[] { subArray });
     }
     return null;
   }
@@ -224,11 +223,11 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
       case DOUBLE:
         return new DoubleWritable((double) obj);
       case INT:
-        return new IntWritable(((Long) obj).intValue());
+        return new IntWritable((int) obj);
       case LONG:
         return new LongWritable((long) obj);
       case SHORT:
-        return new ShortWritable(((Long) obj).shortValue());
+        return new ShortWritable((Short) obj);
       case DATE:
         return new DateWritable(new Date(Long.parseLong(String.valueOf(obj.toString()))));
       case TIMESTAMP:
@@ -236,8 +235,8 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
       case STRING:
         return new Text(obj.toString());
       case DECIMAL:
-        return new HiveDecimalWritable(HiveDecimal.create(
-          ((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
+        return new HiveDecimalWritable(
+            HiveDecimal.create(((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
     }
     throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb0405d6/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
index 6d19049..9c1d51e 100644
--- a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
+++ b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
@@ -17,7 +17,7 @@
 package org.apache.carbondata.hiveexample
 
 import java.io.File
-import java.sql.{DriverManager, ResultSet, SQLException, Statement}
+import java.sql.{DriverManager, ResultSet, Statement}
 
 import org.apache.spark.sql.SparkSession
 
@@ -29,11 +29,6 @@ object HiveExample {
 
   private val driverName: String = "org.apache.hive.jdbc.HiveDriver"
 
-  /**
-   * @param args
-   * @throws SQLException
-   */
-  @throws[SQLException]
   def main(args: Array[String]) {
     val rootPath = new File(this.getClass.getResource("/").getPath
                             + "../../../..").getCanonicalPath
@@ -41,46 +36,38 @@ object HiveExample {
     val warehouse = s"$rootPath/integration/hive/target/warehouse"
     val metaStore_Db = s"$rootPath/integration/hive/target/carbon_metaStore_db"
     val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    var resultId = ""
+    var resultName = ""
+    var resultSalary = ""
 
-    import org.apache.spark.sql.CarbonSession._
 
-    System.setProperty("hadoop.home.dir", "/")
+    import org.apache.spark.sql.CarbonSession._
 
-    val carbon = SparkSession
+    val carbonSession = SparkSession
       .builder()
       .master("local")
       .appName("HiveExample")
-      .config("carbon.sql.warehouse.dir", warehouse).enableHiveSupport()
+      .config("carbonSession.sql.warehouse.dir", warehouse).enableHiveSupport()
       .getOrCreateCarbonSession(
         store, metaStore_Db)
 
-    val carbonHadoopJarPath = s"$rootPath/assembly/target/scala-2.11/carbondata_2.11-1.1"
+
-                              ".0-incubating-SNAPSHOT-shade-hadoop2.7.2.jar"
-
-    val carbon_DefaultHadoopVersion_JarPath =
-      s"$rootPath/assembly/target/scala-2.11/carbondata_2.11-1.1" +
-      ".0-incubating-SNAPSHOT-shade-hadoop2.2.0.jar"
-
-    val hiveJarPath = s"$rootPath/integration/hive/target/carbondata-hive-1.1" +
-                      ".0-incubating-SNAPSHOT.jar"
-
-    carbon.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
+    carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
 
-    carbon
+    carbonSession
       .sql(
         """CREATE TABLE HIVE_CARBON_EXAMPLE (ID int,NAME string,SALARY double) STORED BY
           |'CARBONDATA' """
           .stripMargin)
 
-    carbon.sql(
+    carbonSession.sql(
       s"""
            LOAD DATA LOCAL INPATH '$rootPath/integration/hive/src/main/resources/data.csv'
INTO
            TABLE
          HIVE_CARBON_EXAMPLE
            """)
-    carbon.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
+    carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
 
-    carbon.stop()
+    carbonSession.stop()
 
     try {
       Class.forName(driverName)
@@ -93,84 +80,49 @@ object HiveExample {
     val hiveEmbeddedServer2 = new HiveEmbeddedServer2()
     hiveEmbeddedServer2.start()
     val port = hiveEmbeddedServer2.getFreePort
-    val con = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
-    val stmt: Statement = con.createStatement
+    val connection = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default",
"", "")
+    val statement: Statement = connection.createStatement
 
     logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")
 
-    try {
-      stmt
-        .execute(s"ADD JAR $carbonHadoopJarPath")
-    }
-    catch {
-      case exception: Exception =>
-        logger.warn(s"Jar Not Found $carbonHadoopJarPath" + "Looking For hadoop 2.2.0 version
jar")
-        try {
-          stmt
-            .execute(s"ADD JAR $carbon_DefaultHadoopVersion_JarPath")
-        }
-        catch {
-          case exception: Exception => logger
-            .error("Exception Occurs:Neither One of Jar is Found" +
-                   s"$carbon_DefaultHadoopVersion_JarPath,$carbonHadoopJarPath" +
-                   "Atleast One Should Be Build")
-            hiveEmbeddedServer2.stop()
-            System.exit(0)
-        }
-    }
-    try {
-      stmt
-        .execute(s"ADD JAR $hiveJarPath")
-    }
-    catch {
-      case exception: Exception => logger.error(s"Exception Occurs:Jar Not Found $hiveJarPath")
-        hiveEmbeddedServer2.stop()
-        System.exit(0)
-
-    }
-    stmt.execute("set hive.mapred.supports.subdirectories=true")
-    stmt.execute("set mapreduce.input.fileinputformat.input.dir.recursive=true")
-
-
-    stmt.execute("CREATE TABLE IF NOT EXISTS " + "HIVE_CARBON_EXAMPLE " +
-                 " (ID int, NAME string,SALARY double)")
-    stmt
+    statement.execute("CREATE TABLE IF NOT EXISTS " + "HIVE_CARBON_EXAMPLE " +
+                      " (ID int, NAME string,SALARY double)")
+    statement
       .execute(
         "ALTER TABLE HIVE_CARBON_EXAMPLE SET FILEFORMAT INPUTFORMAT \"org.apache.carbondata."
+
         "hive.MapredCarbonInputFormat\"OUTPUTFORMAT \"org.apache.carbondata.hive." +
         "MapredCarbonOutputFormat\"SERDE \"org.apache.carbondata.hive." +
         "CarbonHiveSerDe\" ")
 
-    stmt
+    statement
       .execute(
         "ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
         s"'file:///$store/default/hive_carbon_example' ")
 
-
     val sql = "SELECT * FROM HIVE_CARBON_EXAMPLE"
 
-    val res: ResultSet = stmt.executeQuery(sql)
+    val resultSet: ResultSet = statement.executeQuery(sql)
 
     var rowsFetched = 0
 
-    while (res.next) {
+    while (resultSet.next) {
       if (rowsFetched == 0) {
         println("+---+" + "+-------+" + "+--------------+")
         println("| ID|" + "| NAME |" + "| SALARY        |")
 
         println("+---+" + "+-------+" + "+--------------+")
 
-        val resultId = res.getString("id")
-        val resultName = res.getString("name")
-        val resultSalary = res.getString("salary")
+        resultId = resultSet.getString("id")
+        resultName = resultSet.getString("name")
+        resultSalary = resultSet.getString("salary")
 
         println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary  |")
         println("+---+" + "+-------+" + "+--------------+")
       }
       else {
-        val resultId = res.getString("ID")
-        val resultName = res.getString("NAME")
-        val resultSalary = res.getString("SALARY")
+        resultId = resultSet.getString("ID")
+        resultName = resultSet.getString("NAME")
+        resultSalary = resultSet.getString("SALARY")
 
         println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary   |")
         println("+---+" + "+-------+" + "+--------------+")
@@ -184,7 +136,7 @@ object HiveExample {
     // fetching the separate columns
     var individualColRowsFetched = 0
 
-    val resultIndividualCol = stmt.executeQuery("SELECT NAME FROM HIVE_CARBON_EXAMPLE")
+    val resultIndividualCol = statement.executeQuery("SELECT NAME FROM HIVE_CARBON_EXAMPLE")
 
     while (resultIndividualCol.next) {
       if (individualColRowsFetched == 0) {
@@ -193,13 +145,13 @@ object HiveExample {
 
         println("+---++---------+")
 
-        val resultName = resultIndividualCol.getString("name")
+        resultName = resultIndividualCol.getString("name")
 
         println(s"| $resultName    |")
         println("+---+" + "+---------+")
       }
       else {
-        val resultName = resultIndividualCol.getString("NAME")
+        resultName = resultIndividualCol.getString("NAME")
 
         println(s"| $resultName      |")
         println("+---+" + "+---------+")
@@ -211,8 +163,10 @@ object HiveExample {
 
     logger.info("Fetching the Out Of Order Columns ")
 
-    val resultOutOfOrderCol = stmt.executeQuery("SELECT SALARY,ID,NAME FROM HIVE_CARBON_EXAMPLE")
+    val resultOutOfOrderCol = statement
+      .executeQuery("SELECT SALARY,ID,NAME FROM HIVE_CARBON_EXAMPLE")
     var outOfOrderColFetched = 0
+
     while (resultOutOfOrderCol.next()) {
       if (outOfOrderColFetched == 0) {
         println("+---+" + "+-------+" + "+--------------+")
@@ -220,17 +174,17 @@ object HiveExample {
 
         println("+---+" + "+-------+" + "+--------------+")
 
-        val resultId = resultOutOfOrderCol.getString("id")
-        val resultName = resultOutOfOrderCol.getString("name")
-        val resultSalary = resultOutOfOrderCol.getString("salary")
+        resultId = resultOutOfOrderCol.getString("id")
+        resultName = resultOutOfOrderCol.getString("name")
+        resultSalary = resultOutOfOrderCol.getString("salary")
 
         println(s"| $resultSalary |" + s"| $resultId |" + s"| $resultName  |")
         println("+---+" + "+-------+" + "+--------------+")
       }
       else {
-        val resultId = resultOutOfOrderCol.getString("ID")
-        val resultName = resultOutOfOrderCol.getString("NAME")
-        val resultSalary = resultOutOfOrderCol.getString("SALARY")
+        resultId = resultOutOfOrderCol.getString("ID")
+        resultName = resultOutOfOrderCol.getString("NAME")
+        resultSalary = resultOutOfOrderCol.getString("SALARY")
 
         println(s"| $resultSalary |" + s"| $resultId |" + s"| $resultName   |")
         println("+---+" + "+-------+" + "+--------------+")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb0405d6/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 90c3b69..c67e093 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -312,7 +312,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void>
{
       if (null != value) {
         switch (type[mesCount]) {
           case SHORT:
+            rowData.putShort(size, (Short) value);
+            size += 2;
+            break;
           case INT:
+            rowData.putInt(size, (Integer) value);
+            size += 4;
+            break;
           case LONG:
             rowData.putLong(size, (Long) value);
             size += 8;


Mime
View raw message