flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shahar Cizer Kobrinsky <shahar.kobrin...@gmail.com>
Subject Re: Schema Evolution on Dynamic Schema
Date Tue, 19 Mar 2019 21:27:27 GMT
My bad. it actually did work with
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that
way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Thanks Fabian,
>
> Im thinking about how to work around that issue and one thing that came to
> my mind is to create a map that holds keys & values that can be edited
> without changing the schema, though im thinking how to implement it in
> Calcite.
> Considering the following original SQL in which "metrics" can be
> added/deleted/renamed
> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
> Group by a
>
> im looking both at json_objectagg & map to change it but it seems that
> json_objectagg is on a later calcite version and map doesnt work for me.
> Trying something like
> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
> group by a
>
> results with "Non-query expression encountered in illegal context"
> is my train of thought the right one? if so, do i have a mistake in the
> way im trying to implement it?
>
> Thanks!
>
>
>
>
>
>
>
> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi,
>>
>> Restarting a changed query from a savepoint is currently not supported.
>> In general this is a very difficult problem as new queries might result
>> in completely different execution plans.
>> The special case of adding and removing aggregates is easier to solve,
>> but the schema of the stored state changes and we would need to analyze the
>> previous and current query and generate compatible serializers.
>> So far we did not explore this rabbit hole.
>>
>> Also, starting a different query from a savepoint can also lead to weird
>> result semantics.
>> I'd recommend to bootstrap the state of the new query from scatch.
>>
>> Best, Fabian
>>
>>
>>
>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
>> shahar.kobrinsky@gmail.com>:
>>
>>> Or is it the SQL state that is incompatible.. ?
>>>
>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>>> shahar.kobrinsky@gmail.com> wrote:
>>>
>>>> Thanks Guys,
>>>>
>>>> I actually got an error now adding some fields into the select
>>>> statement:
>>>>
>>>> java.lang.RuntimeException: Error while getting state
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>>> at
>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>>> at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>> at
>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>>> backends, the new state serializer must not be incompatible.
>>>> at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>>> at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>>> at
>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>> at
>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>>> at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>>> at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>> ... 9 more
>>>>
>>>> Does that mean i should move from having a Pojo storing the result of
>>>> the SQL retracted stream to Avro? trying to understand how to mitigate it.
>>>>
>>>> Thanks
>>>>
>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walterddr@gmail.com> wrote:
>>>>
>>>>> Hi Shahar,
>>>>>
>>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>>> they save the accumulators to SQL internal states: which are invariant
from
>>>>> your input schema. Based on what you described I think that's why it
is
>>>>> fine for recovering from existing state.
>>>>> I think one confusion you might have is the "toRetractStream" syntax.
>>>>> This actually passes the "retracting" flag to the Flink planner to indicate
>>>>> how the DataStream operator gets generated based on your SQL.
>>>>>
>>>>> So in my understanding, there's really no "state" associated with the
>>>>> "retracting stream", but rather associated with the generated operators.
>>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>>> were an open JIRA[1] that might be related to your question regarding
>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>>> insight here?
>>>>>
>>>>> Regarding the rest of the pipeline, both "filter" and "map" operators
>>>>> are stateless; and sink state recovery depends on what you do.
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>>
>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrinsky@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Rong,
>>>>>>
>>>>>> I have made some quick test changing the SQL select (adding a select
>>>>>> field
>>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>>> without any
>>>>>> errors. I want to make sure i understand how at what point the state
>>>>>> is
>>>>>> stored and how does it work.
>>>>>>
>>>>>> Let's simplify the scenario and forget my specific case of dynamically
>>>>>> generated pojo. let's focus on generic steps of:
>>>>>> Source->register table->SQL select and group by session->retracted
>>>>>> stream
>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>>
>>>>>> And let's assume the SQL select is changed (a field is added
>>>>>> somewhere in
>>>>>> the middle of the select field).
>>>>>> So:
>>>>>> We had intermediate results that are in the old format that are
>>>>>> loaded from
>>>>>> state to the new Row object in the retracted stream. is that an
>>>>>> accurate
>>>>>> statement? at what operator/format is the state stored in this case?
>>>>>> is it
>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail
>>>>>> for me im
>>>>>> trying to understand how/where it is handled in Flink?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>

Mime
View raw message