spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lian Jiang <jiangok2...@gmail.com>
Subject Re: use rocksdb for spark structured streaming (SSS)
Date Sun, 10 Mar 2019 21:46:40 GMT
Thanks guys!

I am using SSS to backfill the past 3 month data. I thought I can use SSS
for both history data and new data. I just realized that SSS is not
appropriate for backfilling since the watermark relies on receivedAt which
could be 3 month ago. I will use batch job for backfill and use SSS (with
watermark and spark-states) for the real time processing.

On Sun, Mar 10, 2019 at 2:40 PM Jungtaek Lim <kabhwan@gmail.com> wrote:

> The query makes state growing infinitely. Could you consider watermark
> apply to "receivedAt" to let unnecessary part of state cleared out? Other
> than watermark you could implement TTL based eviction via
> flatMapGroupsWithState, though you'll need to implement your custom
> "dropDuplicate".
>
> 2019년 3월 11일 (월) 오전 5:59, Georg Heiler <georg.kf.heiler@gmail.com>님이
작성:
>
>> Use https://github.com/chermenin/spark-states instead
>>
>> Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan <
>> arunm@apache.org>:
>>
>>>
>>> Read the link carefully,
>>>
>>> This solution is available (*only*) in Databricks Runtime.
>>>
>>> You can enable RockDB-based state management by setting the following
>>> configuration in the SparkSession before starting the streaming query.
>>>
>>> spark.conf.set(
>>>   "spark.sql.streaming.stateStore.providerClass",
>>>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>>>
>>>
>>> On Sun, 10 Mar 2019 at 11:54, Lian Jiang <jiangok2006@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a very simple SSS pipeline which does:
>>>>
>>>> val query = df
>>>>   .dropDuplicates(Array("Id", "receivedAt"))
>>>>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>>>>   .writeStream
>>>>   .format("parquet")
>>>>   .partitionBy("availabilityDomain", timePartitionCol)
>>>>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>>>>   .option("path", "/data")
>>>>   .option("checkpointLocation", "/data_checkpoint")
>>>>   .start()
>>>>
>>>> After ingesting 2T records, the state under checkpoint folder on HDFS (replicator
factor 2) grows to 2T bytes.
>>>> My cluster has only 2T bytes which means the cluster can barely handle further
data growth.
>>>>
>>>> Online spark documents (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
>>>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot
find any document how
>>>>
>>>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle
HDFS.
>>>>
>>>> Any suggestions? Thanks!
>>>>
>>>>
>>>>
>>>>

Mime
View raw message