spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hemant Bhanawat <hemant9...@gmail.com>
Subject Re: Sorting on a streaming dataframe
Date Wed, 02 May 2018 05:45:08 GMT
Opened an issue. https://issues.apache.org/jira/browse/SPARK-24144

Since it is a Major issue for us, I have marked it as Major issue. Feel
free to change if that is not the case from Spark's perspective.

On Tue, May 1, 2018 at 4:34 AM, Michael Armbrust <michael@databricks.com>
wrote:

> Please open a JIRA then!
>
> On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <hemant9379@gmail.com>
> wrote:
>
>> I see.
>>
>> monotonically_increasing_id on streaming dataFrames will be really
>> helpful to me and I believe to many more users. Adding this functionality
>> in Spark would be efficient in terms of performance as compared to
>> implementing this functionality inside the applications.
>>
>> Hemant
>>
>> On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <
>> michael@databricks.com> wrote:
>>
>>> The basic tenet of structured streaming is that a query should return
>>> the same answer in streaming or batch mode. We support sorting in complete
>>> mode because we have all the data and can sort it correctly and return the
>>> full answer.  In update or append mode, sorting would only return a correct
>>> answer if we could promise that records that sort lower are going to arrive
>>> later (and we can't).  Therefore, it is disallowed.
>>>
>>> If you are just looking for a unique, stable id and you are already
>>> using kafka as the source, you could just combine the partition id and the
>>> offset. The structured streaming connector to Kafka
>>> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
>>> exposes both of these in the schema of the streaming DataFrame. (similarly
>>> for kinesis you can use the shard id and sequence number)
>>>
>>> If you need the IDs to be contiguous, then this is a somewhat
>>> fundamentally hard problem.  I think the best we could do is add support
>>> for monotonically_increasing_id() in streaming dataframes.
>>>
>>> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <chayapan@gmail.com>
>>> wrote:
>>>
>>>> Perhaps your use case fits to Apache Kafka better.
>>>>
>>>> More info at:
>>>> https://kafka.apache.org/documentation/streams/
>>>>
>>>> Everything really comes down to the architecture design and algorithm
>>>> spec. However, from my experience with Spark, there are many good reasons
>>>> why this requirement is not supported ;)
>>>>
>>>> Best,
>>>>
>>>> Chayapan (A)
>>>>
>>>>
>>>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <hemant9379@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks Chris. There are many ways in which I can solve this problem but
>>>> they are cumbersome. The easiest way would have been to sort the streaming
>>>> dataframe. The reason I asked this question is because I could not find a
>>>> reason why sorting on streaming dataframe is disallowed.
>>>>
>>>> Hemant
>>>>
>>>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>>>> chris.bowden@microfocus.com> wrote:
>>>>
>>>>> You can happily sort the underlying RDD of InternalRow(s) inside a
>>>>> sink, assuming you are willing to implement and maintain your own sink(s).
>>>>> That is, just grabbing the parquet sink, etc. isn’t going to work out
of
>>>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient
>>>>> and requires less working knowledge to make effective reuse of internals.
>>>>> Just group by foo and then sort accordingly and assign ids. The id counter
>>>>> can be stateful per group. Sometimes this problem may not need to be
solved
>>>>> at all. For example, if you are using kafka, a proper partitioning scheme
>>>>> and message offsets may be “good enough”.
>>>>> ------------------------------
>>>>> *From:* Hemant Bhanawat <hemant9379@gmail.com>
>>>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>>>> *To:* Reynold Xin
>>>>> *Cc:* dev
>>>>> *Subject:* Re: Sorting on a streaming dataframe
>>>>>
>>>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>>>> incoming records. For that, we are zipping the streaming rdds with that
>>>>> counter using a modified version of ZippedWithIndexRDD. We are ok if
the
>>>>> records in the streaming dataframe gets counters in random order but
the
>>>>> counter should always be incrementing.
>>>>>
>>>>> This is working fine until we have a failure. When we have a failure,
>>>>> we re-assign the records to snapshot ids  and this time same snapshot
id
>>>>> can get assigned to a different record. This is a problem because the
>>>>> primary key in our storage engine is <recordid, snapshotid>. So
we want to
>>>>> sort the dataframe so that the records always get the same snapshot id.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <rxin@databricks.com>
>>>>> wrote:
>>>>>
>>>>> Can you describe your use case more?
>>>>>
>>>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9379@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Why is sorting on streaming dataframes not supported(unless it is
>>>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>>>
>>>>> Hemant
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>

Mime
View raw message