impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Tauber-Marshall <tmarsh...@cloudera.com>
Subject Re: Creating Impala UDA
Date Wed, 21 Jun 2017 19:12:50 GMT
On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ravikanth.4b0@gmail.com> wrote:

> Thomas,
>
> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see
> ignore nulls has been added in Impala 2.7.0. And, does adding ignore nulls
> would make a big difference in the expected result?
>

That's too bad. I think that 'ignore nulls' would give you what you want -
the problem with the query that you posted is that it eliminates rows that
don't match the where clause, so for example the row with "Zero" in it is
eliminated because it is filtered out by the "where a is not null", whereas
"ignore nulls" only affects the values that could be returned by the
specific analytic function that the ignore is applied to.


>
> Ravi
>
> On 21 June 2017 at 11:20, Thomas Tauber-Marshall <tmarshall@cloudera.com>
> wrote:
>
>> Ravi,
>>
>> Instead of using the "where ... is not null", have you tried
>> 'last_value(... ignore nulls)'
>>
>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ravikanth.4b0@gmail.com>
>> wrote:
>>
>>> Antoni,
>>>
>>> The problem in using last_value function() as far as I observed is, if I
>>> use it on multiple columns in a single query, its not retrieving results as
>>> expected.
>>>
>>>  Input:
>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>> 1 0 1 NULL NULL
>>> 1 1 2 Hi NULL
>>> 1 3 4 Hello Hi
>>> 1 2 5 NULL NULL
>>> 1 4 NULL NULL Zero
>>>
>>> Expected Output:
>>>
>>>
>>> ID (Int) A (Int) B (String) C (String)
>>> 1 4 Hello Zero
>>>
>>>
>>> Query executed:
>>>
>>> select id, last_value(a) over(partition by id order by date_time desc)
>>> as a, last_value(b) over(partition by id order by date_time desc) as b,
>>> last_value(c) over(partition by id order by date_time desc) as c from
>>> udf_test where a is not null and b is not null and c is not null;
>>>
>>>
>>> Output I am getting:
>>>
>>> +----+---+-------+----+
>>>
>>> | id | a | b     | c  |
>>>
>>> +----+---+-------+----+
>>>
>>> | 1  | 4 | Hello | Hi ||
>>>
>>> +----+---+-------+----+
>>>
>>> Hopefully, I am clear with the problem above.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 20 June 2017 at 22:05, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
>>>
>>>> Antoni,
>>>>
>>>> Thanks for the suggestion. Let me have a look at it and hopefully we
>>>> can use it in our use case.
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <aivanov@vmware.com> wrote:
>>>>
>>>>> Hi Ravi,
>>>>>
>>>>> I am curious why you are not using already existing last_value
>>>>> function in Impala to get "latest non null value for the column”
>>>>>
>>>>> e.g
>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>>>> Date_Time)
>>>>>
>>>>> Thanks,
>>>>> Antoni
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <tarmstrong@cloudera.com>
>>>>> wrote:
>>>>>
>>>>> This was double-posted to
>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>>>>> also. I'll continue the discussion here.
>>>>>
>>>>> > Can we have the flexibility of declaring the variable globally in
>>>>> UDF? Globally, I mean outside the function?
>>>>>
>>>>> > And, the reason I am declaring a static variable is to restore the
>>>>> value of timestamp for every record so that I can perform a comparison
of
>>>>> the timestamps. Is there an alternative approach for this?
>>>>>
>>>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>>>> what you expect - the function can be invoked concurrently by multiple
>>>>> threads.
>>>>>
>>>>> It seems like you probably want to store some additional state in the
>>>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>>>> additional intermediate state is stored in a StringVal:
>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>>>>
>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>>>>> correct or needed any modification in it as well? I am very new to
Impala
>>>>>> UDF & C++ and having some hard time figuring out the problems.
>>>>>>
>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <bharathv@cloudera.com
>>>>>> > wrote:
>>>>>>
>>>>>>> You need to allocate memory for tsTemp, else it can segfault.
That
>>>>>>> could be the issue here.
>>>>>>>
>>>>>>>  static TimestampVal* tsTemp;
>>>>>>>       tsTemp->date = 0;
>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ravikanth.4b0@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>> We are using Impala to do various processings in our systems.
We
>>>>>>>> have a requirement recently, wherein we have to handle the
updates on the
>>>>>>>> events i.e, we have an 'e_update' table which has the partial
updates
>>>>>>>> received for various events. The fields that are not updated
are being
>>>>>>>> stored as NULL values.
>>>>>>>>
>>>>>>>>
>>>>>>>> Ex:
>>>>>>>>
>>>>>>>>
>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>>>> 1 0 1 NULL NULL
>>>>>>>> 1 1 2 Hi NULL
>>>>>>>> 1 3 4 Hello Hi
>>>>>>>> 1 2 5 NULL NULL
>>>>>>>> 1 4 NULL NULL Zero
>>>>>>>>
>>>>>>>>
>>>>>>>> P.S: Please consider Date_time as valid timestamp type values.
For
>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>>>
>>>>>>>>
>>>>>>>> As seen in the above table, the events have a unique id and
as we
>>>>>>>> get an update to a particular event, we are storing the date_time
at which
>>>>>>>> update has happened and also storing the partial updated
values. Apart from
>>>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>>>
>>>>>>>>
>>>>>>>> We are planning to mimic inplace updates on the table, so
that it
>>>>>>>> would retrieve the resulting table as follows using the query
below: We
>>>>>>>> don't delete the data.
>>>>>>>>
>>>>>>>>
>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>>>>>>>> current_val(B,date_time) as B, current_val(C,date_time) as
C from e_update
>>>>>>>> GROUP BY ID;
>>>>>>>>
>>>>>>>>
>>>>>>>> where, current_val is a custom impala UDA we are planning
to
>>>>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>>>>
>>>>>>>>
>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>>>> 1 4 Hello Zero
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Implemented current_val UDA:
>>>>>>>> The below code is only for int type inputs:
>>>>>>>>
>>>>>>>>
>>>>>>>> uda-currentval.h
>>>>>>>>
>>>>>>>> //This is a sample for retrieving the current value of e_update
table
>>>>>>>> //
>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
input, const TimestampVal& ts, IntVal* val);
>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
src, IntVal* dst);
>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
IntVal& val);
>>>>>>>>
>>>>>>>> uda-currentval.cc
>>>>>>>>
>>>>>>>> // -----------------------------------------------------------------------------------------------
>>>>>>>> // This is a sample for retrieving the current value of e_update
table
>>>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val)
{
>>>>>>>>       val->is_null = false;
>>>>>>>>       val->val = 0;
>>>>>>>> }
>>>>>>>>
>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
input, const TimestampVal& ts, IntVal* val) {
>>>>>>>>       static TimestampVal* tsTemp;
>>>>>>>>       tsTemp->date = 0;
>>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>         val->val = input.val;
>>>>>>>>         return;
>>>>>>>>       }
>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day
> tsTemp->time_of_day){
>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>         val->val = input.val;
>>>>>>>>         return;
>>>>>>>>       }
>>>>>>>> }
>>>>>>>>
>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
src, IntVal* dst) {
>>>>>>>>      dst->val += src.val;
>>>>>>>> }
>>>>>>>>
>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
IntVal& val) {
>>>>>>>>      return val;
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> We are able to build and create an aggregate function in
impala,
>>>>>>>> but when trying to run the select query similar to the one
above, it is
>>>>>>>> bringing down couple of impala deamons and throwing the error
below and
>>>>>>>> getting terminated.
>>>>>>>>
>>>>>>>>
>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>>>> hadoop102.**.**.**.com:22000
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We have impalad running on 14 instances.
>>>>>>>>
>>>>>>>>
>>>>>>>> Can someone help resolve us this problem and a better way
to
>>>>>>>> achieve a solution for the scenario explained.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>

Mime
View raw message