impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Tauber-Marshall <tmarsh...@cloudera.com>
Subject Re: Creating Impala UDA
Date Wed, 21 Jun 2017 18:20:04 GMT
Ravi,

Instead of using the "where ... is not null", have you tried
'last_value(... ignore nulls)'

On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ravikanth.4b0@gmail.com> wrote:

> Antoni,
>
> The problem in using last_value function() as far as I observed is, if I
> use it on multiple columns in a single query, its not retrieving results as
> expected.
>
>  Input:
> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
> 1 0 1 NULL NULL
> 1 1 2 Hi NULL
> 1 3 4 Hello Hi
> 1 2 5 NULL NULL
> 1 4 NULL NULL Zero
>
> Expected Output:
>
>
> ID (Int) A (Int) B (String) C (String)
> 1 4 Hello Zero
>
>
> Query executed:
>
> select id, last_value(a) over(partition by id order by date_time desc) as
> a, last_value(b) over(partition by id order by date_time desc) as b,
> last_value(c) over(partition by id order by date_time desc) as c from
> udf_test where a is not null and b is not null and c is not null;
>
>
> Output I am getting:
>
> +----+---+-------+----+
>
> | id | a | b     | c  |
>
> +----+---+-------+----+
>
> | 1  | 4 | Hello | Hi ||
>
> +----+---+-------+----+
>
> Hopefully, I am clear with the problem above.
>
> Thanks,
> Ravi
>
> On 20 June 2017 at 22:05, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
>
>> Antoni,
>>
>> Thanks for the suggestion. Let me have a look at it and hopefully we can
>> use it in our use case.
>>
>> Thanks,
>> Ravi
>>
>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <aivanov@vmware.com> wrote:
>>
>>> Hi Ravi,
>>>
>>> I am curious why you are not using already existing last_value function
>>> in Impala to get "latest non null value for the column”
>>>
>>> e.g
>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>> Date_Time)
>>>
>>> Thanks,
>>> Antoni
>>>
>>>
>>>
>>>
>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <tarmstrong@cloudera.com>
>>> wrote:
>>>
>>> This was double-posted to
>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>>> also. I'll continue the discussion here.
>>>
>>> > Can we have the flexibility of declaring the variable globally in UDF?
>>> Globally, I mean outside the function?
>>>
>>> > And, the reason I am declaring a static variable is to restore the
>>> value of timestamp for every record so that I can perform a comparison of
>>> the timestamps. Is there an alternative approach for this?
>>>
>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>> what you expect - the function can be invoked concurrently by multiple
>>> threads.
>>>
>>> It seems like you probably want to store some additional state in the
>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>> additional intermediate state is stored in a StringVal:
>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>>
>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>> wrote:
>>>
>>>> Thanks Bharath. Can you check if the logic I am implementing is correct
>>>> or needed any modification in it as well? I am very new to Impala UDF &
C++
>>>> and having some hard time figuring out the problems.
>>>>
>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <bharathv@cloudera.com>
>>>> wrote:
>>>>
>>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>>> could be the issue here.
>>>>>
>>>>>  static TimestampVal* tsTemp;
>>>>>       tsTemp->date = 0;
>>>>>       tsTemp->time_of_day = 0;
>>>>>
>>>>>
>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>> We are using Impala to do various processings in our systems. We
have
>>>>>> a requirement recently, wherein we have to handle the updates on
the events
>>>>>> i.e, we have an 'e_update' table which has the partial updates received
for
>>>>>> various events. The fields that are not updated are being stored
as NULL
>>>>>> values.
>>>>>>
>>>>>>
>>>>>> Ex:
>>>>>>
>>>>>>
>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>> 1 0 1 NULL NULL
>>>>>> 1 1 2 Hi NULL
>>>>>> 1 3 4 Hello Hi
>>>>>> 1 2 5 NULL NULL
>>>>>> 1 4 NULL NULL Zero
>>>>>>
>>>>>>
>>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>
>>>>>>
>>>>>> As seen in the above table, the events have a unique id and as we
get
>>>>>> an update to a particular event, we are storing the date_time at
which
>>>>>> update has happened and also storing the partial updated values.
Apart from
>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>
>>>>>>
>>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>>> would retrieve the resulting table as follows using the query below:
We
>>>>>> don't delete the data.
>>>>>>
>>>>>>
>>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time)
>>>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>>>
>>>>>>
>>>>>> where, current_val is a custom impala UDA we are planning to
>>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>>
>>>>>>
>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>> 1 4 Hello Zero
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Implemented current_val UDA:
>>>>>> The below code is only for int type inputs:
>>>>>>
>>>>>>
>>>>>> uda-currentval.h
>>>>>>
>>>>>> //This is a sample for retrieving the current value of e_update table
>>>>>> //
>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
input, const TimestampVal& ts, IntVal* val);
>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
src, IntVal* dst);
>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&
val);
>>>>>>
>>>>>> uda-currentval.cc
>>>>>>
>>>>>> // -----------------------------------------------------------------------------------------------
>>>>>> // This is a sample for retrieving the current value of e_update
table
>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>>       val->is_null = false;
>>>>>>       val->val = 0;
>>>>>> }
>>>>>>
>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
input, const TimestampVal& ts, IntVal* val) {
>>>>>>       static TimestampVal* tsTemp;
>>>>>>       tsTemp->date = 0;
>>>>>>       tsTemp->time_of_day = 0;
>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>         tsTemp->date = ts.date;
>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>         val->val = input.val;
>>>>>>         return;
>>>>>>       }
>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
tsTemp->time_of_day){
>>>>>>         tsTemp->date = ts.date;
>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>         val->val = input.val;
>>>>>>         return;
>>>>>>       }
>>>>>> }
>>>>>>
>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
src, IntVal* dst) {
>>>>>>      dst->val += src.val;
>>>>>> }
>>>>>>
>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&
val) {
>>>>>>      return val;
>>>>>> }
>>>>>>
>>>>>>
>>>>>> We are able to build and create an aggregate function in impala,
but
>>>>>> when trying to run the select query similar to the one above, it
is
>>>>>> bringing down couple of impala deamons and throwing the error below
and
>>>>>> getting terminated.
>>>>>>
>>>>>>
>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>> hadoop102.**.**.**.com:22000
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> We have impalad running on 14 instances.
>>>>>>
>>>>>>
>>>>>> Can someone help resolve us this problem and a better way to achieve
>>>>>> a solution for the scenario explained.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>

Mime
View raw message