carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject carbondata git commit: Adds example for update and delete with Spark 2.1
Date Thu, 13 Jul 2017 14:09:32 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 52ab73097 -> c2e2ba08f


Adds example for update and delete with Spark 2.1

This closes #1159


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c2e2ba08
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c2e2ba08
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c2e2ba08

Branch: refs/heads/master
Commit: c2e2ba08f464b5427d23545a61689722c65e560a
Parents: 52ab730
Author: mayun <simafengyun1984@163.com>
Authored: Tue Jul 11 22:56:10 2017 +0800
Committer: chenliang613 <chenliang613@apache.org>
Committed: Thu Jul 13 22:09:02 2017 +0800

----------------------------------------------------------------------
 .../spark/src/main/resources/data_update.csv    |  11 --
 .../examples/DataUpdateDeleteExample.scala      | 120 ++++++++++---
 .../examples/DataUpdateDeleteExample.scala      | 173 +++++++++++++++++++
 3 files changed, 267 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2e2ba08/examples/spark/src/main/resources/data_update.csv
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/resources/data_update.csv b/examples/spark/src/main/resources/data_update.csv
deleted file mode 100644
index 3f72e00..0000000
--- a/examples/spark/src/main/resources/data_update.csv
+++ /dev/null
@@ -1,11 +0,0 @@
-ID,country,name,phonetype,serialname,salary
-1,france,bbb1,phone197,ASD69643,25000
-2,france,bbb2,phone756,ASD42892,25001
-3,france,bbb3,phone1904,ASD37014,25002
-4,france,bbb4,phone2435,ASD66902,25003
-5,france,bbb5,phone2441,ASD90633,25004
-6,germany,bbb6,phone294,ASD59961,25005
-7,germany,bbb7,phone610,ASD14875,25006
-8,germany,bbb8,phone1848,ASD57308,25007
-9,germany,bbb9,phone706,ASD86717,25008
-10,germany,bbb10,phone685,ASD30505,25009
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2e2ba08/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
index 7be392d..830a819 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
@@ -17,6 +17,13 @@
 
 package org.apache.carbondata.examples
 
+import java.io.File
+import java.text.SimpleDateFormat
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext}
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.examples.util.ExampleUtils
@@ -25,20 +32,25 @@ object DataUpdateDeleteExample {
 
   def main(args: Array[String]) {
     val cc = ExampleUtils.createCarbonContext("DataUpdateDeleteExample")
-    val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
-    val testData1 = ExampleUtils.currentPath + "/src/main/resources/data_update.csv"
+
+    // for local files
+    var rootPath = ExampleUtils.currentPath
+    // for hdfs files
+    // var rootPath = "hdfs://hdfs-host/carbon"
+
+    val testData = rootPath + "/src/main/resources/data.csv"
 
     // Specify date format based on raw data
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
 
     cc.sql("DROP TABLE IF EXISTS t3")
-    cc.sql("DROP TABLE IF EXISTS update_table")
+    cc.sql("DROP TABLE IF EXISTS t5")
 
     // Create table, 6 dimensions, 1 measure
     cc.sql("""
            CREATE TABLE IF NOT EXISTS t3
-           (ID Int, date Date, country String,
+           (id Int, date Date, country String,
            name String, phonetype String, serialname char(10), salary Int)
            STORED BY 'carbondata'
            """)
@@ -47,71 +59,127 @@ object DataUpdateDeleteExample {
            LOAD DATA LOCAL INPATH '$testData' INTO TABLE t3
            """)
 
+    // Specify date format based on raw data
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    // Simulate data and write to table t5
+    var fields = Seq[StructField]()
+    fields = fields :+ DataTypes.createStructField("t5_id", DataTypes.IntegerType, false)
+    fields = fields :+ DataTypes.createStructField("t5_date", DataTypes.DateType, false)
+    fields = fields :+ DataTypes.createStructField("t5_country", DataTypes.StringType, false)
+    fields = fields :+ DataTypes.createStructField("t5_name", DataTypes.StringType, false)
+    fields = fields :+ DataTypes.createStructField("t5_phonetype", DataTypes.StringType,
false)
+    fields = fields :+ DataTypes.createStructField("t5_serialname", DataTypes.StringType,
false)
+    fields = fields :+ DataTypes.createStructField("t5_salary", DataTypes.IntegerType, false)
+    var schema = StructType(fields)
+    var sdf = new SimpleDateFormat("yyyy-MM-dd")
+    var data = cc.sparkContext.parallelize(1 to 10).map { x =>
+      val day = x % 20 + 1
+      var dateStr = ""
+      if (day >= 10) {
+        dateStr = "2017-07-" + day
+      } else {
+        dateStr = "2017-07-0" + day
+      }
+      val dt = new java.sql.Date(sdf.parse(dateStr).getTime);
+      var row = Seq[Any]()
+      row = row :+ x
+      row = row :+ dt
+      row = row :+ "china"
+      row = row :+ "bbb" + x
+      row = row :+ "phone" + 100 * x
+      row = row :+ "ASD" + (1000 * x - x)
+      row = row :+ (25000 + x)
+      Row.fromSeq(row)
+    }
+    var df = cc.createDataFrame(data, schema)
+    df.write
+      .format("carbondata")
+      .option("tableName", "t5")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    cc.sql("""
+           SELECT * FROM t5 ORDER BY t5_id
+           """).show()
+
     // 1.Update data with simple SET
     cc.sql("""
-           SELECT * FROM t3 ORDER BY ID
+           SELECT * FROM t3 ORDER BY t3.id
            """).show()
 
     // Update data where salary < 15003
+    val dateStr = "2018-08-08"
+    cc.sql(s"""
+           UPDATE t3 SET (t3.date, t3.country) = ('$dateStr', 'india') WHERE t3.salary <
15003
+           """).show()
+    // Query data again after the above update
     cc.sql("""
-           UPDATE t3 SET (t3.country) = ('india') WHERE t3.salary < 15003
+           SELECT * FROM t3 ORDER BY t3.id
            """).show()
+
     cc.sql("""
            UPDATE t3 SET (t3.salary) = (t3.salary + 9) WHERE t3.name = 'aaa1'
            """).show()
-
     // Query data again after the above update
     cc.sql("""
-           SELECT * FROM t3 ORDER BY ID
+           SELECT * FROM t3 ORDER BY t3.id
            """).show()
 
     // 2.Update data with subquery result SET
     cc.sql("""
-           CREATE TABLE IF NOT EXISTS update_table
-           (ID Int, country String,
-           name String, phonetype String, serialname char(10), salary Int)
-           STORED BY 'carbondata'
-           """)
-
-    cc.sql(s"""
-           LOAD DATA LOCAL INPATH '$testData1' INTO TABLE update_table
-           """)
-
+         UPDATE t3
+         SET (t3.country, t3.name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5)
+         WHERE t3.id < 5""").show()
     cc.sql("""
          UPDATE t3
-         SET (t3.country, t3.name) = (SELECT u.country, u.name FROM update_table u WHERE
u.id = 5)
+         SET (t3.date, t3.serialname, t3.salary) =
+         (SELECT '2099-09-09', t5_serialname, '9999' FROM t5  WHERE t5_id = 5)
          WHERE t3.id < 5""").show()
 
     // Query data again after the above update
     cc.sql("""
-           SELECT * FROM t3 ORDER BY ID
+           SELECT * FROM t3 ORDER BY t3.id
            """).show()
 
     // 3.Update data with join query result SET
     cc.sql("""
          UPDATE t3
          SET (t3.country, t3.salary) =
-         (SELECT u.country, f.salary FROM update_table u FULL JOIN update_table f
-         WHERE u.id = 8 and f.id=6) WHERE t3.id >6""").show()
+         (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u
+         WHERE u.id = t5_id and t5_id=6) WHERE t3.id >6""").show()
 
     // Query data again after the above update
     cc.sql("""
-           SELECT * FROM t3 ORDER BY ID
+           SELECT * FROM t3 ORDER BY t3.id
            """).show()
 
     // 4.Delete data where salary > 15005
     cc.sql("""
-           DELETE FROM t3 WHERE salary > 15005
+           DELETE FROM t3 WHERE t3.salary > 15005
+           """).show()
+
+    // Query data again after delete data
+    cc.sql("""
+           SELECT * FROM t3 ORDER BY t3.id
+           """).show()
+
+    // 5.Delete data WHERE id in (1, 2, $key)
+    var key = 3
+    cc.sql(s"""
+           DELETE FROM t3 WHERE t3.id in (1, 2, $key)
            """).show()
 
     // Query data again after delete data
     cc.sql("""
-           SELECT * FROM t3 ORDER BY ID
+           SELECT * FROM t3 ORDER BY t3.id
            """).show()
 
     // Drop table
     cc.sql("DROP TABLE IF EXISTS t3")
-    cc.sql("DROP TABLE IF EXISTS update_table")
+    cc.sql("DROP TABLE IF EXISTS t5")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2e2ba08/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
new file mode 100644
index 0000000..60b2664
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object DataUpdateDeleteExample {
+
+  def main(args: Array[String]) {
+
+    // for local files
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    // for hdfs files
+    // var rootPath = "hdfs://hdfs-host/carbon"
+
+    var storeLocation = s"$rootPath/examples/spark2/target/store"
+    var warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    var metastoredb = s"$rootPath/examples/spark2/target"
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("DataUpdateDeleteExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .config("spark.driver.host", "localhost")
+      .config("spark.sql.crossJoin.enabled", "true")
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+    spark.sparkContext.setLogLevel("WARN")
+
+    // Specify date format based on raw data
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    import spark.implicits._
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS t3")
+    spark.sql("DROP TABLE IF EXISTS t5")
+
+     // Simulate data and write to table t3
+    var sdf = new SimpleDateFormat("yyyy-MM-dd")
+    var df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => (x, new java.sql.Date(sdf.parse("2015-07-" + (x % 10 + 10)).getTime),
+        "china", "aaa" + x, "phone" + 555 * x, "ASD" + (60000 + x), 14999 + x))
+      .toDF("t3_id", "t3_date", "t3_country", "t3_name",
+          "t3_phonetype", "t3_serialname", "t3_salary")
+    df.write
+      .format("carbondata")
+      .option("tableName", "t3")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    // Simulate data and write to table t5
+    df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => (x, new java.sql.Date(sdf.parse("2017-07-" + (x % 20 + 1)).getTime),
+        "usa", "bbb" + x, "phone" + 100 * x, "ASD" + (1000 * x - x), 25000 + x))
+      .toDF("t5_id", "t5_date", "t5_country", "t5_name",
+          "t5_phonetype", "t5_serialname", "t5_salary")
+    df.write
+      .format("carbondata")
+      .option("tableName", "t5")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    spark.sql("""
+           SELECT * FROM t3 ORDER BY t3_id
+           """).show()
+    spark.sql("""
+           SELECT * FROM t5 ORDER BY t5_id
+           """).show()
+
+    // 1.Update data with simple SET
+    // Update data where salary < 15003
+    val dateStr = "2018-08-08"
+    spark.sql(s"""
+           UPDATE t3 SET (t3_date, t3_country) = ('$dateStr', 'india') WHERE t3_salary <
15003
+           """).show()
+    // Query data again after the above update
+    spark.sql("""
+           SELECT * FROM t3 ORDER BY t3_id
+           """).show()
+
+    spark.sql("""
+           UPDATE t3 SET (t3_salary) = (t3_salary + 9) WHERE t3_name = 'aaa1'
+           """).show()
+    // Query data again after the above update
+    spark.sql("""
+           SELECT * FROM t3 ORDER BY t3_id
+           """).show()
+
+    // 2.Update data with subquery result SET
+    spark.sql("""
+         UPDATE t3
+         SET (t3_country, t3_name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5)
+         WHERE t3_id < 5""").show()
+    spark.sql("""
+         UPDATE t3
+         SET (t3_date, t3_serialname, t3_salary) =
+         (SELECT '2099-09-09', t5_serialname, '9999' FROM t5 WHERE t5_id = 5)
+         WHERE t3_id < 5""").show()
+
+    // Query data again after the above update
+    spark.sql("""
+           SELECT * FROM t3 ORDER BY t3_id
+           """).show()
+
+    // 3.Update data with join query result SET
+    spark.sql("""
+         UPDATE t3
+         SET (t3_country, t3_salary) =
+         (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u
+         WHERE u.t3_id = t5_id and t5_id=6) WHERE t3_id >6""").show()
+
+    // Query data again after the above update
+    spark.sql("""
+           SELECT * FROM t3 ORDER BY t3_id
+           """).show()
+
+    // 4.Delete data where salary > 15005
+    spark.sql("""
+           DELETE FROM t3 WHERE t3_salary > 15005
+           """).show()
+
+    // Query data again after delete data
+    spark.sql("""
+           SELECT * FROM t3 ORDER BY t3_id
+           """).show()
+
+    // 5.Delete data WHERE id in (1, 2, $key)
+    var key = 3
+    spark.sql(s"""
+           DELETE FROM t3 WHERE t3_id in (1, 2, $key)
+           """).show()
+
+    // Query data again after delete data
+    spark.sql("""
+           SELECT * FROM t3 ORDER BY t3_id
+           """).show()
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS t3")
+    spark.sql("DROP TABLE IF EXISTS t5")
+
+    spark.stop()
+  }
+
+}


Mime
View raw message