impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Kanth <ravikanth....@gmail.com>
Subject Re: Creating Impala UDA
Date Fri, 23 Jun 2017 06:59:36 GMT
I see again it's the problem with my version. I need to consider the
intermediate type also to be IntVal type if the return type of finalize is
IntVal

On Thu, Jun 22, 2017 at 22:00 Ravi Kanth <ravikanth.4b0@gmail.com> wrote:

> Thanks for that Matthew. Not to do anything against your suggestion but, I
> just wanted to try impala custom UDA and see if it might give us any
> expected result. If not this, I would learn to write our own custom UDA to
> solve problems other than this.
>
> I am facing issues when trying to return any return type other than
> StringVal in the finalize method of UDA. Its throwing error when
> registering the handlers with impala and creating a function.
>
> ERROR: AnalysisException: Could not find function CurrentValOneUpdate(INT,
> TIMESTAMP) returns INT in:
> hdfs://**************com:8020/dwh/impala/udf/libudasample.so
>
> Check that function name, arguments, and return type are correct.
>
> The above works fine when returning StringVal type.
> Impala version using: Impala Shell v2.5.0-cdh5.7.0
>
> Waiting for your suggestion
>
> Thanks,
> Ravi
>
> On 22 June 2017 at 11:58, Matthew Jacobs <mj@cloudera.com> wrote:
>
>> I haven't looked at the code, but this isn't going to solve your
>> earlier issue because this is an aggregation function, this will not
>> work for analytic functions. I was saying in an earlier email that we
>> don't expose an interface for registering analytic functions, so you
>> should probably just upgrade Impala. I posted the patch of the
>> analytic function we added to illustrate that it will be difficult and
>> not a matter of registering a UDA.
>>
>> On Thu, Jun 22, 2017 at 11:25 AM, Ravi Kanth <ravikanth.4b0@gmail.com>
>> wrote:
>> > Hi All,
>> >
>> > I wrote the below lines of code to achieve this functionality:
>> >
>> > // Copyright 2012 Cloudera Inc.
>> > //
>> > // Licensed under the Apache License, Version 2.0 (the "License");
>> > // you may not use this file except in compliance with the License.
>> > // You may obtain a copy of the License at
>> > //
>> > // http://www.apache.org/licenses/LICENSE-2.0
>> > //
>> > // Unless required by applicable law or agreed to in writing, software
>> > // distributed under the License is distributed on an "AS IS" BASIS,
>> > // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> > // See the License for the specific language governing permissions and
>> > // limitations under the License.
>> >
>> > #include "uda-sample.h"
>> > #include <assert.h>
>> > #include <sstream>
>> >
>> > using namespace impala_udf;
>> > using namespace std;
>> >
>> >
>> > StringVal ToStringVal(FunctionContext* context, const IntVal& val) {
>> >   stringstream ss;
>> >   ss << val.val;
>> >   string str = ss.str();
>> >   StringVal string_val(context, str.size());
>> >   memcpy(string_val.ptr, str.c_str(), str.size());
>> >   return string_val;
>> > }
>> >
>> > //
>> >
>> ---------------------------------------------------------------------------------------
>> > // // This is an aggregate function for retrieving the latest non-null
>> value
>> > for e_update table
>> > // //
>> >
>> ---------------------------------------------------------------------------------------
>> >
>> > struct CurrentValStruct {
>> >   IntVal value;
>> >   TimestampVal tsTemp;
>> > };
>> >
>> > // Initialize the StringVal intermediate to a zero'd AvgStruct
>> > void CurrentValInit(FunctionContext* context, StringVal* val) {
>> >   val->is_null = false;
>> >   val->len = sizeof(CurrentValStruct);
>> >   val->ptr = context->Allocate(val->len);
>> >   memset(val->ptr, 0, val->len);
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
>> >   cur->value.is_null = false;
>> >   cur->tsTemp.is_null = false;
>> > }
>> >
>> > void CurrentValUpdate(FunctionContext* context, const IntVal& input,
>> const
>> > TimestampVal& ts, StringVal* val) {
>> >   assert(!val->is_null);
>> >   assert(val->len == sizeof(CurrentValStruct));
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
>> >   //checking if the incoming input value is null
>> >   if(!input.is_null){
>> >     if(ts.date >= cur->tsTemp.date && ts.time_of_day >
>> > cur->tsTemp.time_of_day){
>> >     cur->value = input.val;
>> >       cur->tsTemp.date = ts.date;
>> >       cur->tsTemp.time_of_day = ts.time_of_day;
>> >     }
>> >   }
>> > }
>> >
>> > void CurrentValMerge(FunctionContext* context, const StringVal& src,
>> > StringVal* dst) {
>> >   if (src.is_null) return;
>> >   const CurrentValStruct* src_cur = reinterpret_cast<const
>> > CurrentValStruct*>(src.ptr);
>> >   CurrentValStruct* dst_cur =
>> reinterpret_cast<CurrentValStruct*>(dst->ptr);
>> >   if(dst_cur->tsTemp.is_null){
>> >     dst_cur->value = src_cur->value;
>> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
>> >     dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
>> >   dst_cur->tsTemp.is_null = false;
>> >   dst_cur->value.is_null = false;
>> >  }
>> >   else{
>> >     if(src_cur->tsTemp.date >= dst_cur->tsTemp.date &&
>> > src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){
>> >     dst_cur->value = src_cur->value;
>> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
>> >       dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
>> >     }
>> >   }
>> > }
>> >
>> > StringVal CurrentValSerialize(FunctionContext* context, const StringVal&
>> > val) {
>> >   assert(!val.is_null);
>> >   StringVal result(context, val.len);
>> >   memcpy(result.ptr, val.ptr, val.len);
>> >   context->Free(val.ptr);
>> >   return result;
>> > }
>> >
>> > StringVal CurrentValFinalize(FunctionContext* context, const StringVal&
>> val)
>> > {
>> >   //IntVal intResult;
>> >   assert(!val.is_null);
>> >   assert(val.len == sizeof(CurrentValStruct));
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>> >   StringVal result;
>> >   if (cur->value == 0) {
>> >     result = StringVal::null();
>> >     //intResult = 0;
>> >   } else {
>> >     // intResult = cur->value.val;
>> >     // Copies the result to memory owned by Impala
>> >     result = ToStringVal(context, cur->value.val);
>> > //  intResult = atoi(result.c_str());
>> >   //  std::istringstream(result) >> intResult;
>> >   }
>> >   context->Free(val.ptr);
>> >   return result;
>> > }
>> >
>> > Queries:
>> >
>> > create aggregate function current_val(int,timestamp) returns string
>> location
>> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
>> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
>> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
>> >
>> >
>> > select id, current_val(a,date_time) as a from udf_test GROUP BY id;
>> >
>> >
>> > The above is working fine and I am able to achieve my requirement. But,
>> is
>> > there any possibility that we can return an IntVal type rather than
>> > StringVal type? If so where can I make the changes?
>> >
>> > I tried changing the below:
>> >
>> > IntVal CurrentValSerialize(FunctionContext* context, const StringVal&
>> val) {
>> >
>> >   assert(!val.is_null);
>> >
>> >   StringVal result(context, val.len);
>> >
>> >   memcpy(result.ptr, val.ptr, val.len);
>> >
>> >   context->Free(val.ptr);
>> >
>> >   IntVal intResult;
>> >
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>> >
>> >   intResult = cur->value.val;
>> >
>> >   return intResult;
>> >
>> > }
>> >
>> >
>> > IntVal CurrentValFinalize(FunctionContext* context, const StringVal&
>> val) {
>> >
>> >   IntVal intResult;
>> >
>> >   assert(!val.is_null);
>> >
>> >   assert(val.len == sizeof(CurrentValStruct));
>> >
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>> >
>> >   //StringVal result;
>> >
>> >   if (cur->value == 0) {
>> >
>> >     //result = StringVal::null();
>> >
>> >     intResult = 0;
>> >
>> >   } else {
>> >
>> >     intResult = cur->value.val;
>> >
>> >     // Copies the result to memory owned by Impala
>> >
>> >     //result = ToStringVal(context, cur->value.val);
>> >
>> > //  intResult = atoi(result.c_str());
>> >
>> >   //  std::istringstream(result) >> intResult;
>> >
>> >   }
>> >
>> >   context->Free(val.ptr);
>> >
>> >   return intResult;
>> >
>> > }
>> >
>> >
>> > But, when trying to create aggregate function I am facing,
>> >
>> > create aggregate function current_val(int,timestamp) returns int
>> location
>> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
>> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
>> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
>> >
>> >
>> > Query: create aggregate function current_val(int,timestamp) returns int
>> > location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit'
>> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
>> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'
>> >
>> >
>> > ERROR: AnalysisException: Could not find function CurrentValUpdate(INT,
>> > TIMESTAMP) returns INT in:
>> > hdfs://***************:8020/impala/udf/libudasample.so
>> >
>> > Check that function name, arguments, and return type are correct.
>> >
>> >
>> > I changed the header file function definition also accordingly. Can
>> someone
>> > suggest if I am missing something here?
>> >
>> > Thanks,
>> > Ravi
>> >
>> > On 21 June 2017 at 13:42, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
>> >>
>> >> Thanks for the suggestion Matthew. Let me look into the patch. I am
>> >> currently working on building a custom UDA. Hopefully the information
>> you
>> >> provided and the discussion we had might be useful to me.
>> >>
>> >> On 21 June 2017 at 13:39, Matthew Jacobs <mj@cloudera.com> wrote:
>> >>>
>> >>> I'd strongly recommend the latter (upgrading). We don't really expose
>> >>> the analytic function interface, so you'd end up writing an Impala
>> >>> patch, and analytic functions are particularly tricky.
>> >>>
>> >>> Here's Thomas' patch to add 'ignore nulls' in first/last value:
>> >>> https://gerrit.cloudera.org/#/c/3328/
>> >>>
>> >>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>> >>> wrote:
>> >>> > Thanks All. I will think of a possible solution either by
>> implementing
>> >>> > a
>> >>> > Custom UDA or would update the version.
>> >>> >
>> >>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
>> >>> > <tmarshall@cloudera.com> wrote:
>> >>> >>
>> >>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <
>> ravikanth.4b0@gmail.com>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>> >>> >>>
>> >>> >>> Unfortunately, as mentioned the version of impala we are using I
>> >>> >>> belive
>> >>> >>> it doesn't support ignore nulls.
>> >>> >>>
>> >>> >>> But, my question is would last_value function retrieve a latest
>> not
>> >>> >>> null
>> >>> >>> value irrespective of using ignore nulls?
>> >>> >>
>> >>> >>
>> >>> >> Not sure I follow - if you use last_value without ignore nulls,
>> you'll
>> >>> >> get
>> >>> >> the latest value taking all values into consideration, which may or
>> >>> >> may not
>> >>> >> be null.
>> >>> >>
>> >>> >>>
>> >>> >>>
>> >>> >>> Ravi
>> >>> >>>
>> >>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj@cloudera.com>
>> wrote:
>> >>> >>>>
>> >>> >>>> Ah I think Thomas is right. I read the expected results and the
>> >>> >>>> query
>> >>> >>>> too quickly, so my comment about the asc/desc is probably wrong.
>> >>> >>>> Clearly my point about analytic functions being tricky holds
>> true :)
>> >>> >>>>
>> >>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>> >>> >>>> <tmarshall@cloudera.com> wrote:
>> >>> >>>> >
>> >>> >>>> >
>> >>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth
>> >>> >>>> > <ravikanth.4b0@gmail.com>
>> >>> >>>> > wrote:
>> >>> >>>> >>
>> >>> >>>> >> Thomas,
>> >>> >>>> >>
>> >>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0
>> and I
>> >>> >>>> >> see
>> >>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding
>> >>> >>>> >> ignore
>> >>> >>>> >> nulls
>> >>> >>>> >> would make a big difference in the expected result?
>> >>> >>>> >
>> >>> >>>> >
>> >>> >>>> > That's too bad. I think that 'ignore nulls' would give you what
>> >>> >>>> > you
>> >>> >>>> > want -
>> >>> >>>> > the problem with the query that you posted is that it
>> eliminates
>> >>> >>>> > rows
>> >>> >>>> > that
>> >>> >>>> > don't match the where clause, so for example the row with
>> "Zero"
>> >>> >>>> > in it
>> >>> >>>> > is
>> >>> >>>> > eliminated because it is filtered out by the "where a is not
>> >>> >>>> > null",
>> >>> >>>> > whereas
>> >>> >>>> > "ignore nulls" only affects the values that could be returned
>> by
>> >>> >>>> > the
>> >>> >>>> > specific analytic function that the ignore is applied to.
>> >>> >>>> >
>> >>> >>>> >>
>> >>> >>>> >>
>> >>> >>>> >> Ravi
>> >>> >>>> >>
>> >>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
>> >>> >>>> >> <tmarshall@cloudera.com>
>> >>> >>>> >> wrote:
>> >>> >>>> >>>
>> >>> >>>> >>> Ravi,
>> >>> >>>> >>>
>> >>> >>>> >>> Instead of using the "where ... is not null", have you tried
>> >>> >>>> >>> 'last_value(... ignore nulls)'
>> >>> >>>> >>>
>> >>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth
>> >>> >>>> >>> <ravikanth.4b0@gmail.com>
>> >>> >>>> >>> wrote:
>> >>> >>>> >>>>
>> >>> >>>> >>>> Antoni,
>> >>> >>>> >>>>
>> >>> >>>> >>>> The problem in using last_value function() as far as I
>> observed
>> >>> >>>> >>>> is,
>> >>> >>>> >>>> if I
>> >>> >>>> >>>> use it on multiple columns in a single query, its not
>> >>> >>>> >>>> retrieving
>> >>> >>>> >>>> results as
>> >>> >>>> >>>> expected.
>> >>> >>>> >>>>
>> >>> >>>> >>>>  Input:
>> >>> >>>> >>>>
>> >>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>> >>> >>>> >>>> 101NULLNULL
>> >>> >>>> >>>> 112HiNULL
>> >>> >>>> >>>> 134HelloHi
>> >>> >>>> >>>> 125NULLNULL
>> >>> >>>> >>>> 14NULLNULLZero
>> >>> >>>> >>>>
>> >>> >>>> >>>> Expected Output:
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> ID (Int)A (Int)B (String)C (String)
>> >>> >>>> >>>> 14HelloZero
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> Query executed:
>> >>> >>>> >>>>
>> >>> >>>> >>>> select id, last_value(a) over(partition by id order by
>> >>> >>>> >>>> date_time
>> >>> >>>> >>>> desc)
>> >>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time
>> >>> >>>> >>>> desc)
>> >>> >>>> >>>> as b,
>> >>> >>>> >>>> last_value(c) over(partition by id order by date_time desc)
>> as
>> >>> >>>> >>>> c
>> >>> >>>> >>>> from
>> >>> >>>> >>>> udf_test where a is not null and b is not null and c is not
>> >>> >>>> >>>> null;
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> Output I am getting:
>> >>> >>>> >>>>
>> >>> >>>> >>>> +----+---+-------+----+
>> >>> >>>> >>>>
>> >>> >>>> >>>> | id | a | b     | c  |
>> >>> >>>> >>>>
>> >>> >>>> >>>> +----+---+-------+----+
>> >>> >>>> >>>>
>> >>> >>>> >>>> | 1  | 4 | Hello | Hi ||
>> >>> >>>> >>>>
>> >>> >>>> >>>> +----+---+-------+----+
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> Hopefully, I am clear with the problem above.
>> >>> >>>> >>>>
>> >>> >>>> >>>> Thanks,
>> >>> >>>> >>>> Ravi
>> >>> >>>> >>>>
>> >>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <
>> ravikanth.4b0@gmail.com>
>> >>> >>>> >>>> wrote:
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> Antoni,
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and
>> >>> >>>> >>>>> hopefully
>> >>> >>>> >>>>> we
>> >>> >>>> >>>>> can use it in our use case.
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> Thanks,
>> >>> >>>> >>>>> Ravi
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov
>> >>> >>>> >>>>> <aivanov@vmware.com>
>> >>> >>>> >>>>> wrote:
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> Hi Ravi,
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> I am curious why you are not using already existing
>> >>> >>>> >>>>>> last_value
>> >>> >>>> >>>>>> function in Impala to get "latest non null value for the
>> >>> >>>> >>>>>> column”
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> e.g
>> >>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID
>> order
>> >>> >>>> >>>>>> by
>> >>> >>>> >>>>>> Date_Time)
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> Thanks,
>> >>> >>>> >>>>>> Antoni
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
>> >>> >>>> >>>>>> <tarmstrong@cloudera.com>
>> >>> >>>> >>>>>> wrote:
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> This was double-posted to
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>> >>> >>>> >>>>>> also. I'll continue the discussion here.
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> > Can we have the flexibility of declaring the variable
>> >>> >>>> >>>>>> > globally
>> >>> >>>> >>>>>> > in
>> >>> >>>> >>>>>> > UDF? Globally, I mean outside the function?
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> > And, the reason I am declaring a static variable is to
>> >>> >>>> >>>>>> > restore
>> >>> >>>> >>>>>> > the
>> >>> >>>> >>>>>> > value of timestamp for every record so that I can
>> perform a
>> >>> >>>> >>>>>> > comparison of
>> >>> >>>> >>>>>> > the timestamps. Is there an alternative approach for
>> this?
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> Updating a global or static variable in a UDAF is
>> guaranteed
>> >>> >>>> >>>>>> not
>> >>> >>>> >>>>>> to do
>> >>> >>>> >>>>>> what you expect - the function can be invoked
>> concurrently by
>> >>> >>>> >>>>>> multiple
>> >>> >>>> >>>>>> threads.
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> It seems like you probably want to store some additional
>> >>> >>>> >>>>>> state in
>> >>> >>>> >>>>>> the
>> >>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see
>> >>> >>>> >>>>>> Avg())
>> >>> >>>> >>>>>> where
>> >>> >>>> >>>>>> additional intermediate state is stored in a StringVal:
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
>> >>> >>>> >>>>>> <ravikanth.4b0@gmail.com>
>> >>> >>>> >>>>>> wrote:
>> >>> >>>> >>>>>>>
>> >>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am
>> implementing
>> >>> >>>> >>>>>>> is
>> >>> >>>> >>>>>>> correct or needed any modification in it as well? I am
>> very
>> >>> >>>> >>>>>>> new
>> >>> >>>> >>>>>>> to Impala
>> >>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the
>> >>> >>>> >>>>>>> problems.
>> >>> >>>> >>>>>>>
>> >>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>> >>> >>>> >>>>>>> <bharathv@cloudera.com> wrote:
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can
>> >>> >>>> >>>>>>>> segfault.
>> >>> >>>> >>>>>>>> That
>> >>> >>>> >>>>>>>> could be the issue here.
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>>  static TimestampVal* tsTemp;
>> >>> >>>> >>>>>>>>       tsTemp->date = 0;
>> >>> >>>> >>>>>>>>       tsTemp->time_of_day = 0;
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>> >>> >>>> >>>>>>>> <ravikanth.4b0@gmail.com> wrote:
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Hi All,
>> >>> >>>> >>>>>>>>> We are using Impala to do various processings in our
>> >>> >>>> >>>>>>>>> systems.
>> >>> >>>> >>>>>>>>> We
>> >>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle
>> the
>> >>> >>>> >>>>>>>>> updates on the
>> >>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the
>> >>> >>>> >>>>>>>>> partial
>> >>> >>>> >>>>>>>>> updates
>> >>> >>>> >>>>>>>>> received for various events. The fields that are not
>> >>> >>>> >>>>>>>>> updated
>> >>> >>>> >>>>>>>>> are being
>> >>> >>>> >>>>>>>>> stored as NULL values.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Ex:
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C
>> >>> >>>> >>>>>>>>> (String)
>> >>> >>>> >>>>>>>>> 1 0 1 NULL NULL
>> >>> >>>> >>>>>>>>> 1 1 2 Hi NULL
>> >>> >>>> >>>>>>>>> 1 3 4 Hello Hi
>> >>> >>>> >>>>>>>>> 1 2 5 NULL NULL
>> >>> >>>> >>>>>>>>> 1 4 NULL NULL Zero
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type
>> >>> >>>> >>>>>>>>> values.
>> >>> >>>> >>>>>>>>> For
>> >>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id
>> >>> >>>> >>>>>>>>> and as
>> >>> >>>> >>>>>>>>> we
>> >>> >>>> >>>>>>>>> get an update to a particular event, we are storing the
>> >>> >>>> >>>>>>>>> date_time at which
>> >>> >>>> >>>>>>>>> update has happened and also storing the partial
>> updated
>> >>> >>>> >>>>>>>>> values. Apart from
>> >>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table,
>> so
>> >>> >>>> >>>>>>>>> that
>> >>> >>>> >>>>>>>>> it
>> >>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the
>> >>> >>>> >>>>>>>>> query
>> >>> >>>> >>>>>>>>> below: We
>> >>> >>>> >>>>>>>>> don't delete the data.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>> >>> >>>> >>>>>>>>> > current_val(B,date_time) as B,
>> current_val(C,date_time)
>> >>> >>>> >>>>>>>>> > as C
>> >>> >>>> >>>>>>>>> > from e_update
>> >>> >>>> >>>>>>>>> > GROUP BY ID;
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are
>> planning
>> >>> >>>> >>>>>>>>> to
>> >>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the
>> column.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>> >>> >>>> >>>>>>>>> 1 4 Hello Zero
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Implemented current_val UDA:
>> >>> >>>> >>>>>>>>> The below code is only for int type inputs:
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> uda-currentval.h
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of
>> >>> >>>> >>>>>>>>> e_update
>> >>> >>>> >>>>>>>>> table
>> >>> >>>> >>>>>>>>> //
>> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>> >>> >>>> >>>>>>>>> val);
>> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> src,
>> >>> >>>> >>>>>>>>> IntVal* dst);
>> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>> >>> >>>> >>>>>>>>> const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> val);
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> uda-currentval.cc
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> //
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> -----------------------------------------------------------------------------------------------
>> >>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of
>> >>> >>>> >>>>>>>>> e_update
>> >>> >>>> >>>>>>>>> table
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> //-----------------------------------------------------------------------------------------------
>> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>> >>> >>>> >>>>>>>>> val) {
>> >>> >>>> >>>>>>>>>       val->is_null = false;
>> >>> >>>> >>>>>>>>>       val->val = 0;
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>> >>> >>>> >>>>>>>>>       static TimestampVal* tsTemp;
>> >>> >>>> >>>>>>>>>       tsTemp->date = 0;
>> >>> >>>> >>>>>>>>>       tsTemp->time_of_day = 0;
>> >>> >>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>> >>>> >>>>>>>>>         val->val = input.val;
>> >>> >>>> >>>>>>>>>         return;
>> >>> >>>> >>>>>>>>>       }
>> >>> >>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>> >>> >>>> >>>>>>>>> tsTemp->time_of_day){
>> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>> >>>> >>>>>>>>>         val->val = input.val;
>> >>> >>>> >>>>>>>>>         return;
>> >>> >>>> >>>>>>>>>       }
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> src,
>> >>> >>>> >>>>>>>>> IntVal* dst) {
>> >>> >>>> >>>>>>>>>      dst->val += src.val;
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>> >>> >>>> >>>>>>>>> const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> val) {
>> >>> >>>> >>>>>>>>>      return val;
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> We are able to build and create an aggregate function
>> in
>> >>> >>>> >>>>>>>>> impala,
>> >>> >>>> >>>>>>>>> but when trying to run the select query similar to the
>> one
>> >>> >>>> >>>>>>>>> above, it is
>> >>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the
>> >>> >>>> >>>>>>>>> error
>> >>> >>>> >>>>>>>>> below and
>> >>> >>>> >>>>>>>>> getting terminated.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>> >>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> We have impalad running on 14 instances.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better
>> way
>> >>> >>>> >>>>>>>>> to
>> >>> >>>> >>>>>>>>> achieve a solution for the scenario explained.
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>
>> >>> >>>> >>
>> >>> >>>> >
>> >>
>> >>
>> >
>>
>
>

Mime
View raw message