spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apeksha Agnihotri (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-23766) Not able to execute multiple queries in spark structured streaming
Date Thu, 22 Mar 2018 09:13:00 GMT
Apeksha Agnihotri created SPARK-23766:
-----------------------------------------

             Summary: Not able to execute multiple queries in spark structured streaming
                 Key: SPARK-23766
                 URL: https://issues.apache.org/jira/browse/SPARK-23766
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.2.0
            Reporter: Apeksha Agnihotri


I am able to receive output of first query(.ie reader) only. Although all the queries are
running in logs.No data is stored in hdfs also

 
{code:java}
public class A extends D implements Serializable {

    public Dataset<Row> getDataSet(SparkSession session) {
        Dataset<Row> dfs = session.readStream().format("socket").option("host", hostname).option("port",
port).load();
        publish(dfs.toDF(), "reader");
        return dfs;
    }

}

public class B extends D implements Serializable {

    public Dataset<Row> execute(Dataset<Row> ds) {
       Dataset<Row> d = ds.select(functions.explode(functions.split(ds.col("value"),
"\\s+")));
        publish(d.toDF(), "component");
        return d;
    }
}

public class C extends D implements Serializable {

    public Dataset<Row> execute(Dataset<Row> ds) {

        publish(inputDataSet.toDF(), "console");
        ds.writeStream().format("csv").option("path", "hdfs://hostname:9000/user/abc/data1/")
                .option("checkpointLocation", "hdfs://hostname:9000/user/abc/cp").outputMode("append").start();
        return ds;
    }

}

public class D {

    public void publish(Dataset<Row> dataset, String name) {
        dataset.writeStream().format("csv").queryName(name).option("path", "hdfs://hostname:9000/user/abc/"
+ name)
                .option("checkpointLocation", "hdfs://hostname:9000/user/abc/checkpoint/"
+ directory).outputMode("append")
                .start();

    }
}

public static void main(String[] args) {

    SparkSession session = createSession();
    try {
        A a = new A();
        Dataset<Row> records = a.getDataSet(session);

        B b = new B();
        Dataset<Row> ds = b.execute(records);

        C c = new C();
        c.execute(ds);
        session.streams().awaitAnyTermination();
    } catch (StreamingQueryException e) {
        e.printStackTrace();
    }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message