spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Simon Schiff (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-22014) Sample windows in Spark SQL
Date Thu, 14 Sep 2017 14:42:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-22014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Simon Schiff updated SPARK-22014:
---------------------------------
    Description: 
Hello,
I am using spark to process measurement data. It is possible to create sample windows in Spark
Streaming, where the duration of the window is smaller than the slide. But when I try to do
the same with Spark SQL (The measurement data has a time stamp column) then i got an analysis
exception:

{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp,
60000000, 180000000, 0)' due to data type mismatch: The slide duration (180000000) must be
less than or equal to the windowDuration (60000000)
{code}

Here is a example:

{code:java}
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class App {
	public static Timestamp createTimestamp(String in) throws Exception {
		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
	    Date parsedDate = dateFormat.parse(in);
	    return new Timestamp(parsedDate.getTime());
	}
	
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Window Sampling Example").getOrCreate();
		
		List<String> sensorData = new ArrayList<String>();
		sensorData.add("2017-08-04 00:00:00, 22.75");
		sensorData.add("2017-08-04 00:01:00, 23.82");
		sensorData.add("2017-08-04 00:02:00, 24.15");
		sensorData.add("2017-08-04 00:03:00, 23.16");
		sensorData.add("2017-08-04 00:04:00, 22.62");
		sensorData.add("2017-08-04 00:05:00, 22.89");
		sensorData.add("2017-08-04 00:06:00, 23.21");
		sensorData.add("2017-08-04 00:07:00, 24.59");
		sensorData.add("2017-08-04 00:08:00, 24.44");
		
		Dataset<String> in = spark.createDataset(sensorData, Encoders.STRING());
		
		StructType sensorSchema = DataTypes.createStructType(new StructField[] { 
				DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),
				DataTypes.createStructField("value", DataTypes.DoubleType, false),
		});
		
		Dataset<Row> data = spark.createDataFrame(in.toJavaRDD().map(new Function<String,
Row>() {
			public Row call(String line) throws Exception {
				return RowFactory.create(createTimestamp(line.split(",")[0]), Double.parseDouble(line.split(",")[1]));
			}
		}), sensorSchema);
		
		data.groupBy(functions.window(data.col("timestamp"), "1 minutes", "3 minutes")).avg("value").orderBy("window").show(false);
	}
}
{code}

I think there should be no difference (duration and slide) in a "Spark Streaming window" and
a "Spark SQL window" function.

  was:
Hello,
i am using spark to process measurement data. It is possible to create sample windows in Spark
Streaming, where the duration of the window is smaller than the slide. But when I try to do
the same with Spark SQL (The measurement data has a time stamp column) then i got a analysis
exception:

{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp,
60000000, 180000000, 0)' due to data type mismatch: The slide duration (180000000) must be
less than or equal to the windowDuration (60000000)
{code}

Here is a example:

{code:java}
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class App {
	public static Timestamp createTimestamp(String in) throws Exception {
		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
	    Date parsedDate = dateFormat.parse(in);
	    return new Timestamp(parsedDate.getTime());
	}
	
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Window Sampling Example").getOrCreate();
		
		List<String> sensorData = new ArrayList<String>();
		sensorData.add("2017-08-04 00:00:00, 22.75");
		sensorData.add("2017-08-04 00:01:00, 23.82");
		sensorData.add("2017-08-04 00:02:00, 24.15");
		sensorData.add("2017-08-04 00:03:00, 23.16");
		sensorData.add("2017-08-04 00:04:00, 22.62");
		sensorData.add("2017-08-04 00:05:00, 22.89");
		sensorData.add("2017-08-04 00:06:00, 23.21");
		sensorData.add("2017-08-04 00:07:00, 24.59");
		sensorData.add("2017-08-04 00:08:00, 24.44");
		
		Dataset<String> in = spark.createDataset(sensorData, Encoders.STRING());
		
		StructType sensorSchema = DataTypes.createStructType(new StructField[] { 
				DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),
				DataTypes.createStructField("value", DataTypes.DoubleType, false),
		});
		
		Dataset<Row> data = spark.createDataFrame(in.toJavaRDD().map(new Function<String,
Row>() {
			public Row call(String line) throws Exception {
				return RowFactory.create(createTimestamp(line.split(",")[0]), Double.parseDouble(line.split(",")[1]));
			}
		}), sensorSchema);
		
		data.groupBy(functions.window(data.col("timestamp"), "1 minutes", "3 minutes")).avg("value").orderBy("window").show(false);
	}
}
{code}

I think there should be no difference (duration and slide) in a "Spark Streaming window" and
a "Spark SQL window" function.


> Sample windows in Spark SQL
> ---------------------------
>
>                 Key: SPARK-22014
>                 URL: https://issues.apache.org/jira/browse/SPARK-22014
>             Project: Spark
>          Issue Type: Wish
>          Components: DStreams, SQL
>    Affects Versions: 2.2.0
>            Reporter: Simon Schiff
>            Priority: Minor
>
> Hello,
> I am using spark to process measurement data. It is possible to create sample windows
in Spark Streaming, where the duration of the window is smaller than the slide. But when I
try to do the same with Spark SQL (The measurement data has a time stamp column) then i got
an analysis exception:
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp,
60000000, 180000000, 0)' due to data type mismatch: The slide duration (180000000) must be
less than or equal to the windowDuration (60000000)
> {code}
> Here is a example:
> {code:java}
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.functions;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> public class App {
> 	public static Timestamp createTimestamp(String in) throws Exception {
> 		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
> 	    Date parsedDate = dateFormat.parse(in);
> 	    return new Timestamp(parsedDate.getTime());
> 	}
> 	
> 	public static void main(String[] args) {
> 		SparkSession spark = SparkSession.builder().appName("Window Sampling Example").getOrCreate();
> 		
> 		List<String> sensorData = new ArrayList<String>();
> 		sensorData.add("2017-08-04 00:00:00, 22.75");
> 		sensorData.add("2017-08-04 00:01:00, 23.82");
> 		sensorData.add("2017-08-04 00:02:00, 24.15");
> 		sensorData.add("2017-08-04 00:03:00, 23.16");
> 		sensorData.add("2017-08-04 00:04:00, 22.62");
> 		sensorData.add("2017-08-04 00:05:00, 22.89");
> 		sensorData.add("2017-08-04 00:06:00, 23.21");
> 		sensorData.add("2017-08-04 00:07:00, 24.59");
> 		sensorData.add("2017-08-04 00:08:00, 24.44");
> 		
> 		Dataset<String> in = spark.createDataset(sensorData, Encoders.STRING());
> 		
> 		StructType sensorSchema = DataTypes.createStructType(new StructField[] { 
> 				DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),
> 				DataTypes.createStructField("value", DataTypes.DoubleType, false),
> 		});
> 		
> 		Dataset<Row> data = spark.createDataFrame(in.toJavaRDD().map(new Function<String,
Row>() {
> 			public Row call(String line) throws Exception {
> 				return RowFactory.create(createTimestamp(line.split(",")[0]), Double.parseDouble(line.split(",")[1]));
> 			}
> 		}), sensorSchema);
> 		
> 		data.groupBy(functions.window(data.col("timestamp"), "1 minutes", "3 minutes")).avg("value").orderBy("window").show(false);
> 	}
> }
> {code}
> I think there should be no difference (duration and slide) in a "Spark Streaming window"
and a "Spark SQL window" function.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message