spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Marco Gaido (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24089) DataFrame.write().mode(SaveMode.Append).insertInto(TABLE)
Date Wed, 25 Apr 2018 19:36:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Marco Gaido updated SPARK-24089:
--------------------------------
    Priority: Critical  (was: Blocker)

> DataFrame.write().mode(SaveMode.Append).insertInto(TABLE) 
> ----------------------------------------------------------
>
>                 Key: SPARK-24089
>                 URL: https://issues.apache.org/jira/browse/SPARK-24089
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API, Spark Core, SQL
>    Affects Versions: 2.3.0
>            Reporter: kumar
>            Priority: Critical
>              Labels: bug
>
> I am completely stuck with this issue, unable to progress further. For more info pls
refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue]
> I want to load multiple files one by one, don't want to load all files at a time. To
achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data
in database, but it's throwing exception.
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator
'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
> 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
> +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
> {code}
> Code:
> {code:java}
> package com.log;
> import com.log.common.RegexMatch;
> import com.log.spark.SparkProcessor;
> import org.apache.spark.SparkContext;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.sql.*;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import java.util.ArrayList;
> import java.util.List;
> public class TestApp {
>     private SparkSession spark;
>     private SparkContext sparkContext;
>     private SQLContext sqlContext;
>     public TestApp() {
>         SparkSession spark = SparkSession.builder().appName("Simple Application")
>                 .config("spark.master", "local").getOrCreate();
>         SparkContext sc = spark.sparkContext();
>         this.spark = spark;
>         this.sparkContext = sc;
>     }
>     public static void main(String[] args) {
>         TestApp app = new TestApp();
>         String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt",
>                 "C:\\Users\\test\\Desktop\\logs\\log2.txt"};
>         for (String file : afiles) {
>             app.writeFileToSchema(file);
>         }
>     }
>     public void writeFileToSchema(String filePath) {
>         StructType schema = getSchema();
>         JavaRDD<Row> rowRDD = getRowRDD(filePath);
>         if (spark.catalog().tableExists("mylogs")) {
>             logDataFrame = spark.createDataFrame(rowRDD, schema);
>             logDataFrame.createOrReplaceTempView("temptable");
>             logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception
>         } else {
>             logDataFrame = spark.createDataFrame(rowRDD, schema);
>             logDataFrame.createOrReplaceTempView("mylogs");
>         }
>         Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs");
>         List<Row> allrows = results.collectAsList();
>         System.out.println("Count:"+allrows);
>         sqlContext = logDataFrame.sqlContext();
>     }
>     Dataset<Row> logDataFrame;
>     public List<Row> getTagList() {
>         Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs");
>         List<Row> allrows = results.collectAsList();
>         return allrows;
>     }
>     public StructType getSchema() {
>         String schemaString = "a1 b1 c1 d1";
>         List<StructField> fields = new ArrayList<>();
>         for (String fieldName : schemaString.split(" ")) {
>             StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType,
true);
>             fields.add(field);
>         }
>         StructType schema = DataTypes.createStructType(fields);
>         return schema;
>     }
>     public JavaRDD<Row> getRowRDD(String filePath) {
>         JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD();
>         RegexMatch reg = new RegexMatch();
>         JavaRDD<Row> rowRDD = logRDD
>                 .map((Function<String, Row>) line -> {
>                     String[] st = line.split(" ");
>                     return RowFactory.create(st[0], st[1], st[2], st[3]);
>                 });
>         rowRDD.persist(StorageLevel.MEMORY_ONLY());
>         return rowRDD;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message