impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Kanth <ravikanth....@gmail.com>
Subject Re: Creating Impala UDA
Date Wed, 21 Jun 2017 05:05:24 GMT
Antoni,

Thanks for the suggestion. Let me have a look at it and hopefully we can
use it in our use case.

Thanks,
Ravi

On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <aivanov@vmware.com> wrote:

> Hi Ravi,
>
> I am curious why you are not using already existing last_value function in
> Impala to get "latest non null value for the column”
>
> e.g
> last_value(column_a ignore nulls) over(partition by ID  order by Date_Time)
>
> Thanks,
> Antoni
>
>
>
>
> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <tarmstrong@cloudera.com>
> wrote:
>
> This was double-posted to
> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
> also. I'll continue the discussion here.
>
> > Can we have the flexibility of declaring the variable globally in UDF?
> Globally, I mean outside the function?
>
> > And, the reason I am declaring a static variable is to restore the value
> of timestamp for every record so that I can perform a comparison of the
> timestamps. Is there an alternative approach for this?
>
> Updating a global or static variable in a UDAF is guaranteed not to do
> what you expect - the function can be invoked concurrently by multiple
> threads.
>
> It seems like you probably want to store some additional state in the
> intermediate value. There are some sample UDAs here (see Avg()) where
> additional intermediate state is stored in a StringVal:
> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>
> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
> wrote:
>
>> Thanks Bharath. Can you check if the logic I am implementing is correct
>> or needed any modification in it as well? I am very new to Impala UDF & C++
>> and having some hard time figuring out the problems.
>>
>> On 20 June 2017 at 14:27, Bharath Vissapragada <bharathv@cloudera.com>
>> wrote:
>>
>>> You need to allocate memory for tsTemp, else it can segfault. That could
>>> be the issue here.
>>>
>>>  static TimestampVal* tsTemp;
>>>       tsTemp->date = 0;
>>>       tsTemp->time_of_day = 0;
>>>
>>>
>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>> We are using Impala to do various processings in our systems. We have a
>>>> requirement recently, wherein we have to handle the updates on the events
>>>> i.e, we have an 'e_update' table which has the partial updates received for
>>>> various events. The fields that are not updated are being stored as NULL
>>>> values.
>>>>
>>>>
>>>> Ex:
>>>>
>>>>
>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>> 1 0 1 NULL NULL
>>>> 1 1 2 Hi NULL
>>>> 1 3 4 Hello Hi
>>>> 1 2 5 NULL NULL
>>>> 1 4 NULL NULL Zero
>>>>
>>>>
>>>> P.S: Please consider Date_time as valid timestamp type values. For easy
>>>> understanding, mentioned them as 0,1,2,3,4,5
>>>>
>>>>
>>>> As seen in the above table, the events have a unique id and as we get
>>>> an update to a particular event, we are storing the date_time at which
>>>> update has happened and also storing the partial updated values. Apart from
>>>> the updated values, the rest are stored as NULL values.
>>>>
>>>>
>>>> We are planning to mimic inplace updates on the table, so that it would
>>>> retrieve the resulting table as follows using the query below: We don't
>>>> delete the data.
>>>>
>>>>
>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as
>>>> B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>
>>>>
>>>> where, current_val is a custom impala UDA we are planning to implement.
>>>> i.e. get* latest non null value for the column.*
>>>>
>>>>
>>>> ID (Int) A (Int) B (String) C (String)
>>>> 1 4 Hello Zero
>>>>
>>>>
>>>>
>>>>
>>>> Implemented current_val UDA:
>>>> The below code is only for int type inputs:
>>>>
>>>>
>>>> uda-currentval.h
>>>>
>>>> //This is a sample for retrieving the current value of e_update table
>>>> //
>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input,
const TimestampVal& ts, IntVal* val);
>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal*
dst);
>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>>
>>>> uda-currentval.cc
>>>>
>>>> // -----------------------------------------------------------------------------------------------
>>>> // This is a sample for retrieving the current value of e_update table
>>>> //-----------------------------------------------------------------------------------------------
>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>       val->is_null = false;
>>>>       val->val = 0;
>>>> }
>>>>
>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input,
const TimestampVal& ts, IntVal* val) {
>>>>       static TimestampVal* tsTemp;
>>>>       tsTemp->date = 0;
>>>>       tsTemp->time_of_day = 0;
>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>         tsTemp->date = ts.date;
>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>         val->val = input.val;
>>>>         return;
>>>>       }
>>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>>         tsTemp->date = ts.date;
>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>         val->val = input.val;
>>>>         return;
>>>>       }
>>>> }
>>>>
>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal*
dst) {
>>>>      dst->val += src.val;
>>>> }
>>>>
>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val)
{
>>>>      return val;
>>>> }
>>>>
>>>>
>>>> We are able to build and create an aggregate function in impala, but
>>>> when trying to run the select query similar to the one above, it is
>>>> bringing down couple of impala deamons and throwing the error below and
>>>> getting terminated.
>>>>
>>>>
>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>> hadoop102.**.**.**.com:22000
>>>>
>>>>
>>>>
>>>>
>>>> We have impalad running on 14 instances.
>>>>
>>>>
>>>> Can someone help resolve us this problem and a better way to achieve a
>>>> solution for the scenario explained.
>>>>
>>>
>>>
>>
>
>

Mime
View raw message