impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthew Jacobs ...@cloudera.com>
Subject Re: Creating Impala UDA
Date Wed, 21 Jun 2017 20:39:51 GMT
I'd strongly recommend the latter (upgrading). We don't really expose
the analytic function interface, so you'd end up writing an Impala
patch, and analytic functions are particularly tricky.

Here's Thomas' patch to add 'ignore nulls' in first/last value:
https://gerrit.cloudera.org/#/c/3328/

On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
> Thanks All. I will think of a possible solution either by implementing a
> Custom UDA or would update the version.
>
> On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
> <tmarshall@cloudera.com> wrote:
>>
>> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ravikanth.4b0@gmail.com>
>> wrote:
>>>
>>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>>>
>>> Unfortunately, as mentioned the version of impala we are using I belive
>>> it doesn't support ignore nulls.
>>>
>>> But, my question is would last_value function retrieve a latest not null
>>> value irrespective of using ignore nulls?
>>
>>
>> Not sure I follow - if you use last_value without ignore nulls, you'll get
>> the latest value taking all values into consideration, which may or may not
>> be null.
>>
>>>
>>>
>>> Ravi
>>>
>>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj@cloudera.com> wrote:
>>>>
>>>> Ah I think Thomas is right. I read the expected results and the query
>>>> too quickly, so my comment about the asc/desc is probably wrong.
>>>> Clearly my point about analytic functions being tricky holds true :)
>>>>
>>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>>>> <tmarshall@cloudera.com> wrote:
>>>> >
>>>> >
>>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ravikanth.4b0@gmail.com>
>>>> > wrote:
>>>> >>
>>>> >> Thomas,
>>>> >>
>>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I
see
>>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding ignore
>>>> >> nulls
>>>> >> would make a big difference in the expected result?
>>>> >
>>>> >
>>>> > That's too bad. I think that 'ignore nulls' would give you what you
>>>> > want -
>>>> > the problem with the query that you posted is that it eliminates rows
>>>> > that
>>>> > don't match the where clause, so for example the row with "Zero" in
it
>>>> > is
>>>> > eliminated because it is filtered out by the "where a is not null",
>>>> > whereas
>>>> > "ignore nulls" only affects the values that could be returned by the
>>>> > specific analytic function that the ignore is applied to.
>>>> >
>>>> >>
>>>> >>
>>>> >> Ravi
>>>> >>
>>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
>>>> >> <tmarshall@cloudera.com>
>>>> >> wrote:
>>>> >>>
>>>> >>> Ravi,
>>>> >>>
>>>> >>> Instead of using the "where ... is not null", have you tried
>>>> >>> 'last_value(... ignore nulls)'
>>>> >>>
>>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ravikanth.4b0@gmail.com>
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> Antoni,
>>>> >>>>
>>>> >>>> The problem in using last_value function() as far as I observed
is,
>>>> >>>> if I
>>>> >>>> use it on multiple columns in a single query, its not retrieving
>>>> >>>> results as
>>>> >>>> expected.
>>>> >>>>
>>>> >>>>  Input:
>>>> >>>>
>>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>>>> >>>> 101NULLNULL
>>>> >>>> 112HiNULL
>>>> >>>> 134HelloHi
>>>> >>>> 125NULLNULL
>>>> >>>> 14NULLNULLZero
>>>> >>>>
>>>> >>>> Expected Output:
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> ID (Int)A (Int)B (String)C (String)
>>>> >>>> 14HelloZero
>>>> >>>>
>>>> >>>>
>>>> >>>> Query executed:
>>>> >>>>
>>>> >>>> select id, last_value(a) over(partition by id order by date_time
>>>> >>>> desc)
>>>> >>>> as a, last_value(b) over(partition by id order by date_time
desc)
>>>> >>>> as b,
>>>> >>>> last_value(c) over(partition by id order by date_time desc)
as c
>>>> >>>> from
>>>> >>>> udf_test where a is not null and b is not null and c is
not null;
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> Output I am getting:
>>>> >>>>
>>>> >>>> +----+---+-------+----+
>>>> >>>>
>>>> >>>> | id | a | b     | c  |
>>>> >>>>
>>>> >>>> +----+---+-------+----+
>>>> >>>>
>>>> >>>> | 1  | 4 | Hello | Hi ||
>>>> >>>>
>>>> >>>> +----+---+-------+----+
>>>> >>>>
>>>> >>>>
>>>> >>>> Hopefully, I am clear with the problem above.
>>>> >>>>
>>>> >>>> Thanks,
>>>> >>>> Ravi
>>>> >>>>
>>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>> >>>> wrote:
>>>> >>>>>
>>>> >>>>> Antoni,
>>>> >>>>>
>>>> >>>>> Thanks for the suggestion. Let me have a look at it
and hopefully
>>>> >>>>> we
>>>> >>>>> can use it in our use case.
>>>> >>>>>
>>>> >>>>> Thanks,
>>>> >>>>> Ravi
>>>> >>>>>
>>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <aivanov@vmware.com>
>>>> >>>>> wrote:
>>>> >>>>>>
>>>> >>>>>> Hi Ravi,
>>>> >>>>>>
>>>> >>>>>> I am curious why you are not using already existing
last_value
>>>> >>>>>> function in Impala to get "latest non null value
for the column”
>>>> >>>>>>
>>>> >>>>>> e.g
>>>> >>>>>> last_value(column_a ignore nulls) over(partition
by ID  order by
>>>> >>>>>> Date_Time)
>>>> >>>>>>
>>>> >>>>>> Thanks,
>>>> >>>>>> Antoni
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
>>>> >>>>>> <tarmstrong@cloudera.com>
>>>> >>>>>> wrote:
>>>> >>>>>>
>>>> >>>>>> This was double-posted to
>>>> >>>>>>
>>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>>> >>>>>> also. I'll continue the discussion here.
>>>> >>>>>>
>>>> >>>>>> > Can we have the flexibility of declaring the
variable globally
>>>> >>>>>> > in
>>>> >>>>>> > UDF? Globally, I mean outside the function?
>>>> >>>>>>
>>>> >>>>>> > And, the reason I am declaring a static variable
is to restore
>>>> >>>>>> > the
>>>> >>>>>> > value of timestamp for every record so that
I can perform a
>>>> >>>>>> > comparison of
>>>> >>>>>> > the timestamps. Is there an alternative approach
for this?
>>>> >>>>>>
>>>> >>>>>> Updating a global or static variable in a UDAF is
guaranteed not
>>>> >>>>>> to do
>>>> >>>>>> what you expect - the function can be invoked concurrently
by
>>>> >>>>>> multiple
>>>> >>>>>> threads.
>>>> >>>>>>
>>>> >>>>>> It seems like you probably want to store some additional
state in
>>>> >>>>>> the
>>>> >>>>>> intermediate value. There are some sample UDAs here
(see Avg())
>>>> >>>>>> where
>>>> >>>>>> additional intermediate state is stored in a StringVal:
>>>> >>>>>>
>>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
>>>> >>>>>> <ravikanth.4b0@gmail.com>
>>>> >>>>>> wrote:
>>>> >>>>>>>
>>>> >>>>>>> Thanks Bharath. Can you check if the logic I
am implementing is
>>>> >>>>>>> correct or needed any modification in it as
well? I am very new
>>>> >>>>>>> to Impala
>>>> >>>>>>> UDF & C++ and having some hard time figuring
out the problems.
>>>> >>>>>>>
>>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>>>> >>>>>>> <bharathv@cloudera.com> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> You need to allocate memory for tsTemp,
else it can segfault.
>>>> >>>>>>>> That
>>>> >>>>>>>> could be the issue here.
>>>> >>>>>>>>
>>>> >>>>>>>>  static TimestampVal* tsTemp;
>>>> >>>>>>>>       tsTemp->date = 0;
>>>> >>>>>>>>       tsTemp->time_of_day = 0;
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>>>> >>>>>>>> <ravikanth.4b0@gmail.com> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> Hi All,
>>>> >>>>>>>>> We are using Impala to do various processings
in our systems.
>>>> >>>>>>>>> We
>>>> >>>>>>>>> have a requirement recently, wherein
we have to handle the
>>>> >>>>>>>>> updates on the
>>>> >>>>>>>>> events i.e, we have an 'e_update' table
which has the partial
>>>> >>>>>>>>> updates
>>>> >>>>>>>>> received for various events. The fields
that are not updated
>>>> >>>>>>>>> are being
>>>> >>>>>>>>> stored as NULL values.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Ex:
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int)
B (String) C (String)
>>>> >>>>>>>>> 1 0 1 NULL NULL
>>>> >>>>>>>>> 1 1 2 Hi NULL
>>>> >>>>>>>>> 1 3 4 Hello Hi
>>>> >>>>>>>>> 1 2 5 NULL NULL
>>>> >>>>>>>>> 1 4 NULL NULL Zero
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> P.S: Please consider Date_time as valid
timestamp type values.
>>>> >>>>>>>>> For
>>>> >>>>>>>>> easy understanding, mentioned them as
0,1,2,3,4,5
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> As seen in the above table, the events
have a unique id and as
>>>> >>>>>>>>> we
>>>> >>>>>>>>> get an update to a particular event,
we are storing the
>>>> >>>>>>>>> date_time at which
>>>> >>>>>>>>> update has happened and also storing
the partial updated
>>>> >>>>>>>>> values. Apart from
>>>> >>>>>>>>> the updated values, the rest are stored
as NULL values.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> We are planning to mimic inplace updates
on the table, so that
>>>> >>>>>>>>> it
>>>> >>>>>>>>> would retrieve the resulting table as
follows using the query
>>>> >>>>>>>>> below: We
>>>> >>>>>>>>> don't delete the data.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> > SELECT id, current_val(A,date_time)
as A,
>>>> >>>>>>>>> > current_val(B,date_time) as B,
current_val(C,date_time) as C
>>>> >>>>>>>>> > from e_update
>>>> >>>>>>>>> > GROUP BY ID;
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> where, current_val is a custom impala
UDA we are planning to
>>>> >>>>>>>>> implement. i.e. get latest non null
value for the column.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>> >>>>>>>>> 1 4 Hello Zero
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Implemented current_val UDA:
>>>> >>>>>>>>> The below code is only for int type
inputs:
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> uda-currentval.h
>>>> >>>>>>>>>
>>>> >>>>>>>>> //This is a sample for retrieving the
current value of
>>>> >>>>>>>>> e_update
>>>> >>>>>>>>> table
>>>> >>>>>>>>> //
>>>> >>>>>>>>> void CurrentValueInit(FunctionContext*
context, IntVal* val);
>>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext*
context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> input, const TimestampVal& ts, IntVal*
val);
>>>> >>>>>>>>> void CurrentValueMerge(FunctionContext*
context, const IntVal&
>>>> >>>>>>>>> src,
>>>> >>>>>>>>> IntVal* dst);
>>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext*
context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> val);
>>>> >>>>>>>>>
>>>> >>>>>>>>> uda-currentval.cc
>>>> >>>>>>>>>
>>>> >>>>>>>>> //
>>>> >>>>>>>>>
>>>> >>>>>>>>> -----------------------------------------------------------------------------------------------
>>>> >>>>>>>>> // This is a sample for retrieving the
current value of
>>>> >>>>>>>>> e_update
>>>> >>>>>>>>> table
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> //-----------------------------------------------------------------------------------------------
>>>> >>>>>>>>> void CurrentValueInit(FunctionContext*
context, IntVal* val) {
>>>> >>>>>>>>>       val->is_null = false;
>>>> >>>>>>>>>       val->val = 0;
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext*
context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> input, const TimestampVal& ts, IntVal*
val) {
>>>> >>>>>>>>>       static TimestampVal* tsTemp;
>>>> >>>>>>>>>       tsTemp->date = 0;
>>>> >>>>>>>>>       tsTemp->time_of_day = 0;
>>>> >>>>>>>>>       if(tsTemp->date==0 &&
tsTemp->time_of_day==0){
>>>> >>>>>>>>>         tsTemp->date = ts.date;
>>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>> >>>>>>>>>         val->val = input.val;
>>>> >>>>>>>>>         return;
>>>> >>>>>>>>>       }
>>>> >>>>>>>>>       if(ts.date > tsTemp->date
&& ts.time_of_day >
>>>> >>>>>>>>> tsTemp->time_of_day){
>>>> >>>>>>>>>         tsTemp->date = ts.date;
>>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>> >>>>>>>>>         val->val = input.val;
>>>> >>>>>>>>>         return;
>>>> >>>>>>>>>       }
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>> void CurrentValueMerge(FunctionContext*
context, const IntVal&
>>>> >>>>>>>>> src,
>>>> >>>>>>>>> IntVal* dst) {
>>>> >>>>>>>>>      dst->val += src.val;
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext*
context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> val) {
>>>> >>>>>>>>>      return val;
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> We are able to build and create an aggregate
function in
>>>> >>>>>>>>> impala,
>>>> >>>>>>>>> but when trying to run the select query
similar to the one
>>>> >>>>>>>>> above, it is
>>>> >>>>>>>>> bringing down couple of impala deamons
and throwing the error
>>>> >>>>>>>>> below and
>>>> >>>>>>>>> getting terminated.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable
impalad(s):
>>>> >>>>>>>>> hadoop102.**.**.**.com:22000
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> We have impalad running on 14 instances.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Can someone help resolve us this problem
and a better way to
>>>> >>>>>>>>> achieve a solution for the scenario
explained.
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>
>>>> >>
>>>> >

Mime
View raw message