From user-return-26604-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Mar 19 21:27:50 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C986E180789 for ; Tue, 19 Mar 2019 22:27:48 +0100 (CET) Received: (qmail 67331 invoked by uid 500); 19 Mar 2019 21:27:42 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 67319 invoked by uid 99); 19 Mar 2019 21:27:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Mar 2019 21:27:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 54C35C5E78 for ; Tue, 19 Mar 2019 21:27:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.311 X-Spam-Level: **** X-Spam-Status: No, score=4.311 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 3qi5aPVO9p0Z for ; Tue, 19 Mar 2019 21:27:40 +0000 (UTC) Received: from mail-lj1-f196.google.com (mail-lj1-f196.google.com [209.85.208.196]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id E3C286115A for ; Tue, 19 Mar 2019 21:27:39 +0000 (UTC) Received: by mail-lj1-f196.google.com with SMTP id t13so419725lji.2 for ; Tue, 19 Mar 2019 14:27:39 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=g+Y1NQfzvFLyhclB+JILYhTUsx1C6RgjWZiFU3y6PKg=; b=PglMox32zJE91aAmbBaRSADMI42pdf/eIlK5VmYtV68LZOT+hJgwZeQPuWn6VGPe1f cyrNgkobT6GJFhcF9zKg8D4ZIjEu/d3Kbx4kQbr7eZA11AKN3oB4jcvgBpO0dpRt8zo/ CKL7lHAo/09jXV0KSjcfrW7BfRNU1rxncBlaZZoJyCpBAUAQdFYjUgG2hiS/e3O+7aqo mVLFwUmbsjDRhqD2XGTBQR1YcOH7DDccV7ULbYAesZ5ZWcrzMDzhKWOu7AncjoUOltTp LOfXyhOIeuYGakonjYAH0IWSTPgakzQ+y0+HiUNrluEyWTZEUMaTtsemJSblpctvpFJJ CZ2A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=g+Y1NQfzvFLyhclB+JILYhTUsx1C6RgjWZiFU3y6PKg=; b=REHBb3u1n6yXa4T2xJX/us/7tJMBtVLR+BxEUPb4u0P8E93Wnt0dl3T2y2l2lu1BDQ fv2yXMJZl5BBHDGVvVuqDfN1rRFpc26K2t6daKMq3C9yLsH8xlR6sHR1xmX8a/iKtajB DFPhI+RmdfH4oG58+I0JudckOQ+colcmp5LkKrQPfHHgY7izx21KG/mnynl+cUGp4jQG CZoBtnQ7L0mpDwQAg8cJrz+uLzrHeXZeI5QpUd0PD4ZZtZk768PbPyza+jRU/7OfNbCy dippAlTFDJRrnCFMWZVi9ptqUoT7o0y9I6SsnWNjbeTRgZLcDf3TG+4cDauMgcDZVb/q fysQ== X-Gm-Message-State: APjAAAUWRlVeUglcM8YMd4BgO7HlktQ1Kckdv7rvtwq7ZLG1IJSDmIGM KYtW0Vf4yH/2kr9SY05WDtgAmCYS8AReaKFM1Kk= X-Google-Smtp-Source: APXvYqzfb7DrJ9tPvTzzd3O3VkSlhSRTHRJ5S3ETocQbAjvGZsf71mEuh/AtDIiYZSYBMX+W1BRH0gGGMGn7R+gM9zU= X-Received: by 2002:a2e:99d2:: with SMTP id l18mr10560856ljj.27.1553030859325; Tue, 19 Mar 2019 14:27:39 -0700 (PDT) MIME-Version: 1.0 References: <1551905561673-0.post@n4.nabble.com> <1552075664557-0.post@n4.nabble.com> In-Reply-To: From: Shahar Cizer Kobrinsky Date: Tue, 19 Mar 2019 14:27:27 -0700 Message-ID: Subject: Re: Schema Evolution on Dynamic Schema To: Fabian Hueske Cc: Rong Rong , user Content-Type: multipart/alternative; boundary="000000000000b207770584792ccd" --000000000000b207770584792ccd Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable My bad. it actually did work with Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map group by a do you think thats OK as a workaround? main schema should be changed that way - only keys in the map On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < shahar.kobrinsky@gmail.com> wrote: > Thanks Fabian, > > Im thinking about how to work around that issue and one thing that came t= o > my mind is to create a map that holds keys & values that can be edited > without changing the schema, though im thinking how to implement it in > Calcite. > Considering the following original SQL in which "metrics" can be > added/deleted/renamed > Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c > Group by a > > im looking both at json_objectagg & map to change it but it seems that > json_objectagg is on a later calcite version and map doesnt work for me. > Trying something like > Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_m= ap > group by a > > results with "Non-query expression encountered in illegal context" > is my train of thought the right one? if so, do i have a mistake in the > way im trying to implement it? > > Thanks! > > > > > > > > On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske wrote: > >> Hi, >> >> Restarting a changed query from a savepoint is currently not supported. >> In general this is a very difficult problem as new queries might result >> in completely different execution plans. >> The special case of adding and removing aggregates is easier to solve, >> but the schema of the stored state changes and we would need to analyze = the >> previous and current query and generate compatible serializers. >> So far we did not explore this rabbit hole. >> >> Also, starting a different query from a savepoint can also lead to weird >> result semantics. >> I'd recommend to bootstrap the state of the new query from scatch. >> >> Best, Fabian >> >> >> >> Am Mo., 18. M=C3=A4rz 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < >> shahar.kobrinsky@gmail.com>: >> >>> Or is it the SQL state that is incompatible.. ? >>> >>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >>> shahar.kobrinsky@gmail.com> wrote: >>> >>>> Thanks Guys, >>>> >>>> I actually got an error now adding some fields into the select >>>> statement: >>>> >>>> java.lang.RuntimeException: Error while getting state >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(Default= KeyedStateStore.java:62) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getSt= ate(StreamingRuntimeContext.java:135) >>>> at >>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(= GroupAggProcessFunction.scala:74) >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(= FunctionUtils.java:36) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.ope= n(AbstractUdfStreamOperator.java:102) >>>> at >>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.op= en(LegacyKeyedProcessOperator.java:60) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(S= treamTask.java:424) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.= java:290) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>>> backends, the new state serializer must not be incompatible. >>>> at >>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterS= tateTable(HeapKeyedStateBackend.java:301) >>>> at >>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createIntern= alState(HeapKeyedStateBackend.java:341) >>>> at >>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(K= eyedStateFactory.java:47) >>>> at >>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapW= ithTtlIfEnabled(TtlStateFactory.java:63) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKe= yedState(AbstractKeyedStateBackend.java:241) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitione= dState(AbstractKeyedStateBackend.java:290) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedSt= ate(DefaultKeyedStateStore.java:124) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(Default= KeyedStateStore.java:60) >>>> ... 9 more >>>> >>>> Does that mean i should move from having a Pojo storing the result of >>>> the SQL retracted stream to Avro? trying to understand how to mitigate= it. >>>> >>>> Thanks >>>> >>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong wrote: >>>> >>>>> Hi Shahar, >>>>> >>>>> From my understanding, if you use "groupby" withAggregateFunctions, >>>>> they save the accumulators to SQL internal states: which are invarian= t from >>>>> your input schema. Based on what you described I think that's why it = is >>>>> fine for recovering from existing state. >>>>> I think one confusion you might have is the "toRetractStream" syntax. >>>>> This actually passes the "retracting" flag to the Flink planner to in= dicate >>>>> how the DataStream operator gets generated based on your SQL. >>>>> >>>>> So in my understanding, there's really no "state" associated with the >>>>> "retracting stream", but rather associated with the generated operato= rs. >>>>> However, I am not expert in Table/SQL state recovery: I recall there >>>>> were an open JIRA[1] that might be related to your question regarding >>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more >>>>> insight here? >>>>> >>>>> Regarding the rest of the pipeline, both "filter" and "map" operators >>>>> are stateless; and sink state recovery depends on what you do. >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966 >>>>> >>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 >>>>> wrote: >>>>> >>>>>> Thanks Rong, >>>>>> >>>>>> I have made some quick test changing the SQL select (adding a select >>>>>> field >>>>>> in the middle) and reran the job from a savepoint and it worked >>>>>> without any >>>>>> errors. I want to make sure i understand how at what point the state >>>>>> is >>>>>> stored and how does it work. >>>>>> >>>>>> Let's simplify the scenario and forget my specific case of dynamical= ly >>>>>> generated pojo. let's focus on generic steps of: >>>>>> Source->register table->SQL select and group by session->retracted >>>>>> stream >>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>>>>> >>>>>> And let's assume the SQL select is changed (a field is added >>>>>> somewhere in >>>>>> the middle of the select field). >>>>>> So: >>>>>> We had intermediate results that are in the old format that are >>>>>> loaded from >>>>>> state to the new Row object in the retracted stream. is that an >>>>>> accurate >>>>>> statement? at what operator/format is the state stored in this case? >>>>>> is it >>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail >>>>>> for me im >>>>>> trying to understand how/where it is handled in Flink? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>> --000000000000b207770584792ccd Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
My bad. it actually did work with=C2=A0
Select a,= map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
<= div>group by a

do you think thats OK as a wo= rkaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <shahar.kobrinsky@gmail.com> wrote:<= br>
Thanks=C2=A0Fabian,

Im thinking about = how to work around that issue and one thing that came to my mind is to crea= te a map that holds keys & values that can be edited without changing t= he schema, though im thinking how to implement it in Calcite.
Con= sidering the following original SQL in which "metrics" can be add= ed/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as me= tric_sum_c
Group by a

im looking bot= h at json_objectagg & map to change it but it seems that json_objectagg= is on a later calcite version and map doesnt work for me.
Trying= something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as m= etric_sum_c) as metric_map
group by a

re= sults with "Non-query expression encountered in illegal context"<= /div>
is my train of thought the right one? if so, do i have a mistake = in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhueske@gmail.com&g= t; wrote:
Hi,

Restarting a changed query fro= m a savepoint is currently not supported.
In general this is a ve= ry difficult problem as new queries might result in completely different ex= ecution plans.
The special case of adding and removing aggreg= ates is easier to solve, but the schema of the stored state changes and we = would need to analyze the previous and current query and generate compatibl= e serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint c= an also lead to weird result semantics.
I'd recommend to boot= strap the state of the new query from scatch.

Best= , Fabian



Am Mo., 18. M=C3=A4rz 2019 = um 20:02=C2=A0Uhr schrieb Shahar Cizer Kobrinsky <shahar.kobrinsky@gmail.com>= ;:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at = 11:44 AM Shahar Cizer Kobrinsky <shahar.kobrinsky@gmail.com> wrote:
Thanks Guys,

I actually got an error now addin= g some fields into the select statement:

java= .lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeye= dStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operato= rs.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
=
at org.apache.flink.table= .runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.sca= la:74)
at org.apache= .flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.j= ava:36)
at org.apach= e.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfS= treamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.= open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask= .openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.i= nvoke(StreamTask.java:290)
= at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(= Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationE= xception: For heap backends, the new state serializer must not be incompati= ble.
at org.apache.f= link.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKey= edStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInt= ernalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.= createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFa= ctory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.s= tate.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBack= end.java:241)
at org= .apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(A= bstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.get= PartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeye= dStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does = that mean i should move from having a Pojo storing the result of the SQL re= tracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong = Rong <walterddr= @gmail.com> wrote:
Hi Shahar,

From my understan= ding, if you use "groupby" withAggregateFunctions, they save the = accumulators to SQL internal states: which are invariant from your input sc= hema. Based on what you described I think that's why it is fine for rec= overing from existing state.=C2=A0
I think one confusion you migh= t have is the "toRetractStream" syntax. This actually passes the = "retracting" flag to the Flink planner to indicate how the DataSt= ream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associate= d with the "retracting stream", but rather associated with the ge= nerated operators.=C2=A0
However, I am not expert in Table/SQL st= ate recovery: I recall there were an open JIRA[1] that might be related to = your question regarding SQL/Table generated operator recovery. Maybe=C2=A0@= Fabian can provide more insight here?

Regardi= ng the rest of the pipeline, both "filter" and "map" op= erators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fr= i, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrinsky@gmail.com> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field<= br> in the middle) and reran the job from a savepoint and it worked without any=
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically<= br> generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted= stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere = in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from=
state to the new Row object in the retracted stream. is that an accurate statement? at what operator/format is the state stored in this case? is it<= br> the SQL result/Row? is it the Pojo? as this scenario does not fail for me i= m
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-u= ser-mailing-list-archive.2336050.n4.nabble.com/
--000000000000b207770584792ccd--