spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabh...@gmail.com>
Subject Re: use rocksdb for spark structured streaming (SSS)
Date Sun, 10 Mar 2019 21:39:57 GMT
The query makes state growing infinitely. Could you consider watermark
apply to "receivedAt" to let unnecessary part of state cleared out? Other
than watermark you could implement TTL based eviction via
flatMapGroupsWithState, though you'll need to implement your custom
"dropDuplicate".

2019년 3월 11일 (월) 오전 5:59, Georg Heiler <georg.kf.heiler@gmail.com>님이
작성:

> Use https://github.com/chermenin/spark-states instead
>
> Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan <
> arunm@apache.org>:
>
>>
>> Read the link carefully,
>>
>> This solution is available (*only*) in Databricks Runtime.
>>
>> You can enable RockDB-based state management by setting the following
>> configuration in the SparkSession before starting the streaming query.
>>
>> spark.conf.set(
>>   "spark.sql.streaming.stateStore.providerClass",
>>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>>
>>
>> On Sun, 10 Mar 2019 at 11:54, Lian Jiang <jiangok2006@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a very simple SSS pipeline which does:
>>>
>>> val query = df
>>>   .dropDuplicates(Array("Id", "receivedAt"))
>>>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>>>   .writeStream
>>>   .format("parquet")
>>>   .partitionBy("availabilityDomain", timePartitionCol)
>>>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>>>   .option("path", "/data")
>>>   .option("checkpointLocation", "/data_checkpoint")
>>>   .start()
>>>
>>> After ingesting 2T records, the state under checkpoint folder on HDFS (replicator
factor 2) grows to 2T bytes.
>>> My cluster has only 2T bytes which means the cluster can barely handle further
data growth.
>>>
>>> Online spark documents (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
>>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot find
any document how
>>>
>>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle HDFS.
>>>
>>> Any suggestions? Thanks!
>>>
>>>
>>>
>>>

Mime
View raw message