spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apeksha Agnihotri (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-23766) Not able to execute multiple queries in spark structured streaming
Date Tue, 27 Mar 2018 06:30:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Apeksha Agnihotri updated SPARK-23766:
--------------------------------------
    Component/s:     (was: Spark Core)
                 Structured Streaming

> Not able to execute multiple queries in spark structured streaming
> ------------------------------------------------------------------
>
>                 Key: SPARK-23766
>                 URL: https://issues.apache.org/jira/browse/SPARK-23766
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Apeksha Agnihotri
>            Priority: Major
>
> I am able to receive output of first query(.ie reader) only. Although all the queries
are running in logs.No data is stored in hdfs also
>  
> {code:java}
> public class A extends D implements Serializable {
>     public Dataset<Row> getDataSet(SparkSession session) {
>         Dataset<Row> dfs = session.readStream().format("socket").option("host",
hostname).option("port", port).load();
>         publish(dfs.toDF(), "reader");
>         return dfs;
>     }
> }
> public class B extends D implements Serializable {
>     public Dataset<Row> execute(Dataset<Row> ds) {
>        Dataset<Row> d = ds.select(functions.explode(functions.split(ds.col("value"),
"\\s+")));
>         publish(d.toDF(), "component");
>         return d;
>     }
> }
> public class C extends D implements Serializable {
>     public Dataset<Row> execute(Dataset<Row> ds) {
>         publish(inputDataSet.toDF(), "console");
>         ds.writeStream().format("csv").option("path", "hdfs://hostname:9000/user/abc/data1/")
>                 .option("checkpointLocation", "hdfs://hostname:9000/user/abc/cp").outputMode("append").start();
>         return ds;
>     }
> }
> public class D {
>     public void publish(Dataset<Row> dataset, String name) {
>         dataset.writeStream().format("csv").queryName(name).option("path", "hdfs://hostname:9000/user/abc/"
+ name)
>                 .option("checkpointLocation", "hdfs://hostname:9000/user/abc/checkpoint/"
+ directory).outputMode("append")
>                 .start();
>     }
> }
> public static void main(String[] args) {
>     SparkSession session = createSession();
>     try {
>         A a = new A();
>         Dataset<Row> records = a.getDataSet(session);
>         B b = new B();
>         Dataset<Row> ds = b.execute(records);
>         C c = new C();
>         c.execute(ds);
>         session.streams().awaitAnyTermination();
>     } catch (StreamingQueryException e) {
>         e.printStackTrace();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message