Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 06C42200CC2 for ; Wed, 21 Jun 2017 00:18:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 05322160BEF; Tue, 20 Jun 2017 22:18:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A4C26160BE1 for ; Wed, 21 Jun 2017 00:18:23 +0200 (CEST) Received: (qmail 26027 invoked by uid 500); 20 Jun 2017 22:18:22 -0000 Mailing-List: contact user-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@impala.incubator.apache.org Delivered-To: mailing list user@impala.incubator.apache.org Received: (qmail 26016 invoked by uid 99); 20 Jun 2017 22:18:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Jun 2017 22:18:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 71476C1202 for ; Tue, 20 Jun 2017 22:18:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1 X-Spam-Level: * X-Spam-Status: No, score=1 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-1, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=cloudera-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Z3nlH3LVuM57 for ; Tue, 20 Jun 2017 22:18:20 +0000 (UTC) Received: from mail-wr0-f169.google.com (mail-wr0-f169.google.com [209.85.128.169]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 34A5E5FDA5 for ; Tue, 20 Jun 2017 22:18:20 +0000 (UTC) Received: by mail-wr0-f169.google.com with SMTP id r103so109829190wrb.0 for ; Tue, 20 Jun 2017 15:18:20 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=cloudera-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=9QmSWhXn3iAYCL5yt6cYTieWUeezIWF3pIvmLxNB4tc=; b=0wDqL6A93vHQtvhZTTuQQjvhBBc32Qz3p4InXi3tE1v/Ca4hnrhhXCRiwMe2Cni/i9 FCJFQWLV9b4VBPQLwvkWU9PySwDIDCFQpi7a1/L7OU8ST1xDZqXg4eQNRvEYsLSx21VH 0M6PMxy8gy6SC8CCdC5e1iqJ53iw8lA8o2cQnoWHlSo7oItKb9FUuxUiyyqG2JvnhV1d gwRBBg+baWolN4m9xprwsQyJm94uQbEkPjdV66qDx9xdjy+n7huPZNOAinJGPZqnQClS GSODOHZMUuChSdHvY3O6lSJBkFm1gb9wWGh+oVPdFiA1RrAuKMvhwcBEVfbpkw9o8X19 Cb9g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=9QmSWhXn3iAYCL5yt6cYTieWUeezIWF3pIvmLxNB4tc=; b=pXGDiOR/hIqAA46/uymMER77EVBhs7+vJNPNNphnbN8BbwoFDATXbPwSdWZMO6Y1Lc ZWcjJkH7vSzHnU96nowjt032zmmrPRNdebaHpAk9T3pO9f5mBiY/ETuEf8hGZiFXXeXI E6zUuWbcuAt5eK+qiEjVh5BYKgEjpEqcIxT467Id6+2HC2iAoCdYPyqPmYHLVcYVwqpY CdgkR/bJ3IZaVIJYGUU/DRV/nKgxTwIrbeoZtIkszKS+EE6wy+SlR4vsDGaJicYOOryf nS/eHCsfB+QgUDJ0zCv0z4WMZopy9+pIeUB9NplP7RWOjY9KRdXKJzL43U/kGXICC3vP XBPA== X-Gm-Message-State: AKS2vOw7BmJpXLpMa8l02NRLOrPtiaMHf10HuZz4DUZfiGrBq+x9XW4M n72qbUpwr6Xsz9xxGaf/U+zRj9Lq4/tE/Ww= X-Received: by 10.28.131.142 with SMTP id f136mr232314wmd.101.1497997093643; Tue, 20 Jun 2017 15:18:13 -0700 (PDT) MIME-Version: 1.0 Received: by 10.223.169.106 with HTTP; Tue, 20 Jun 2017 15:17:43 -0700 (PDT) In-Reply-To: References: From: Tim Armstrong Date: Tue, 20 Jun 2017 15:17:43 -0700 Message-ID: Subject: Re: Creating Impala UDA To: user@impala.incubator.apache.org Content-Type: multipart/alternative; boundary="001a11443626a44a6f05526ba06e" archived-at: Tue, 20 Jun 2017 22:18:25 -0000 --001a11443626a44a6f05526ba06e Content-Type: text/plain; charset="UTF-8" This was double-posted to http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073 also. I'll continue the discussion here. > Can we have the flexibility of declaring the variable globally in UDF? Globally, I mean outside the function? > And, the reason I am declaring a static variable is to restore the value of timestamp for every record so that I can perform a comparison of the timestamps. Is there an alternative approach for this? Updating a global or static variable in a UDAF is guaranteed not to do what you expect - the function can be invoked concurrently by multiple threads. It seems like you probably want to store some additional state in the intermediate value. There are some sample UDAs here (see Avg()) where additional intermediate state is stored in a StringVal: https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61 On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth wrote: > Thanks Bharath. Can you check if the logic I am implementing is correct or > needed any modification in it as well? I am very new to Impala UDF & C++ > and having some hard time figuring out the problems. > > On 20 June 2017 at 14:27, Bharath Vissapragada > wrote: > >> You need to allocate memory for tsTemp, else it can segfault. That could >> be the issue here. >> >> static TimestampVal* tsTemp; >> tsTemp->date = 0; >> tsTemp->time_of_day = 0; >> >> >> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth >> wrote: >> >>> Hi All, >>> >>> We are using Impala to do various processings in our systems. We have a >>> requirement recently, wherein we have to handle the updates on the events >>> i.e, we have an 'e_update' table which has the partial updates received for >>> various events. The fields that are not updated are being stored as NULL >>> values. >>> >>> >>> >>> Ex: >>> >>> >>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String) >>> 1 0 1 NULL NULL >>> 1 1 2 Hi NULL >>> 1 3 4 Hello Hi >>> 1 2 5 NULL NULL >>> 1 4 NULL NULL Zero >>> >>> >>> >>> P.S: Please consider Date_time as valid timestamp type values. For easy >>> understanding, mentioned them as 0,1,2,3,4,5 >>> >>> >>> >>> As seen in the above table, the events have a unique id and as we get >>> an update to a particular event, we are storing the date_time at which >>> update has happened and also storing the partial updated values. Apart from >>> the updated values, the rest are stored as NULL values. >>> >>> >>> >>> We are planning to mimic inplace updates on the table, so that it would >>> retrieve the resulting table as follows using the query below: We don't >>> delete the data. >>> >>> >>> >>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as >>> B, current_val(C,date_time) as C from e_update GROUP BY ID; >>> >>> >>> >>> where, current_val is a custom impala UDA we are planning to implement. >>> i.e. get* latest non null value for the column.* >>> >>> >>> ID (Int) A (Int) B (String) C (String) >>> 1 4 Hello Zero >>> >>> >>> >>> >>> >>> Implemented current_val UDA: >>> >>> The below code is only for int type inputs: >>> >>> >>> >>> uda-currentval.h >>> >>> //This is a sample for retrieving the current value of e_update table >>> // >>> void CurrentValueInit(FunctionContext* context, IntVal* val); >>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val); >>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst); >>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val); >>> >>> uda-currentval.cc >>> >>> // ----------------------------------------------------------------------------------------------- >>> // This is a sample for retrieving the current value of e_update table >>> //----------------------------------------------------------------------------------------------- >>> void CurrentValueInit(FunctionContext* context, IntVal* val) { >>> val->is_null = false; >>> val->val = 0; >>> } >>> >>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) { >>> static TimestampVal* tsTemp; >>> tsTemp->date = 0; >>> tsTemp->time_of_day = 0; >>> if(tsTemp->date==0 && tsTemp->time_of_day==0){ >>> tsTemp->date = ts.date; >>> tsTemp->time_of_day = ts.time_of_day; >>> val->val = input.val; >>> return; >>> } >>> if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){ >>> tsTemp->date = ts.date; >>> tsTemp->time_of_day = ts.time_of_day; >>> val->val = input.val; >>> return; >>> } >>> } >>> >>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) { >>> dst->val += src.val; >>> } >>> >>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) { >>> return val; >>> } >>> >>> >>> >>> We are able to build and create an aggregate function in impala, but >>> when trying to run the select query similar to the one above, it is >>> bringing down couple of impala deamons and throwing the error below and >>> getting terminated. >>> >>> >>> >>> WARNINGS: Cancelled due to unreachable impalad(s): >>> hadoop102.**.**.**.com:22000 >>> >>> >>> >>> >>> >>> We have impalad running on 14 instances. >>> >>> >>> >>> Can someone help resolve us this problem and a better way to achieve a >>> solution for the scenario explained. >>> >> >> > --001a11443626a44a6f05526ba06e Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
This was double-posted to http://community.cloudera.com/t5/Interactive-Short-cycle-= SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073 also. I'll = continue the discussion here.

> Can we have the flexibility of dec= laring the variable globally in UDF? Globally, I mean outside the function?=

> And, the reason I am declaring a static variable is to restore the value of=20 timestamp for every record so that I can perform a comparison of the=20 timestamps. Is there an alternative approach for this?

Updating a glo= bal or static variable in a UDAF is guaranteed not to do what you expect - = the function can be invoked concurrently by multiple threads.

It= seems like you probably want to store some additional state in the interme= diate value. There are some sample UDAs here (see Avg()) where additional i= ntermediate state is stored in a StringVal: https://github.co= m/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61


On Tue, Jun 20, 201= 7 at 2:40 PM, Ravi Kanth <ravikanth.4b0@gmail.com> wro= te:
Thanks Bharath. Can = you check if the logic I am implementing is correct or needed any modificat= ion in it as well? I am very new to Impala UDF & C++ and having some ha= rd time figuring out the problems.

On 20 June 2= 017 at 14:27, Bharath Vissapragada <bharathv@cloudera.com> wrote:
You need to al= locate memory for tsTemp, else it can segfault. That could be the issue her= e.

 static Tim=
estampVal* tsTemp;
      tsTemp->date =3D 0;
      tsTemp->time_of_day =3D 0;

On Tue, Jun 20, 2017 at= 2:15 PM, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:<= br>

Hi All,

We are using Impala to do various processings i= n our systems. We have a requirement recently, wherein we have to handle th= e updates on the events i.e, we have an 'e_update' table which has = the partial updates received for various events. The fields that are not up= dated are being stored as NULL values.

=C2=A0

Ex:

=C2=A0

<= td style=3D"box-sizing:border-box;padding:2.5px 5px;border:1px solid rgb(22= 4,224,224);line-height:20px;word-break:normal">B (String)=
Date_Time (t= imestamp)A (Int)C (String)
101NULLNULL
112HiNU= LL
1= 34HelloHi
125NULLNULL
14NULLNULL= Zero

=C2=A0

P.S: Please cons= ider Date_time as valid timestamp type values. For easy understanding, ment= ioned them as 0,1,2,3,4,5

=C2=A0

As seen in the above table, the events have a unique id and as we get = an=C2=A0update to a particular event, we are storing the date_time at which= update has happened and also storing the partial updated values. Apart fro= m the updated values, the rest are stored as NULL values.

=C2=A0

We are planning to mimic=C2=A0inplace = updates on the table, so that it would retrieve the resulting table as foll= ows using the query below: We don't delete the data.=C2=A0

=C2=A0

> SELECT id, current_val(A,da= te_time) as A, current_val(B,date_time) as B, current_val(C,date_time) as C= from e_update GROUP BY ID;

=C2=A0

where, current_val is a custom impala UDA we are planning to impleme= nt. i.e. get=C2=A0latest non null value for the column.

=C2=A0=

ID (Int)A (Int)B (String)C (String)
14HelloZero

=C2=A0

= =C2=A0

Impleme= nted current_val UDA:

The below code is only for int type inputs:

=C2=A0

uda-currentval.h

//This is=
 a sample for retrieving the current value of e_update table
//=20
void CurrentValueInit(FunctionContext* context, IntVal* val);
void CurrentValueUpdate(FunctionContext* context, const IntVal& in=
put, const TimestampVal& ts, IntVal* val);
void CurrentValueMerge(FunctionContext* context, const IntVal& src=
, IntVal* dst);
IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&=
; val);

uda-= currentval.cc

// -----------------------------------=
------------------------------------------------------------
// This is a sample for retrieving the current value of e_update table
//---------------------------------------------------------------=
--------------------------------
void CurrentValueInit(FunctionContext* context, IntVal* val) {
      val->is_null =3D false;
      val->val =3D 0;
}

void CurrentValueUpdate(FunctionContext* context, const IntVal& in=
put, const TimestampVal& ts, IntVal* val) {
      static TimestampVal* tsTemp;
      tsTemp->date =3D 0;
      tsTemp->time_of_day =3D 0;
      if(tsTemp->date=3D=3D0 && tsTemp->time_of_day=3D=3D0){
        tsTemp->date =3D ts.date;
        tsTemp->time_of_day =3D ts.time_of_day;
        val->val =3D input.val;
        return;
      }
      if(ts.date > tsTemp->date && ts.time_of_day > tsTemp=
->time_of_day){
        tsTemp->date =3D ts.date;
        tsTemp->time_of_day =3D ts.time_of_day;
        val->val =3D input.val;
        return;
      }
}

void CurrentValueMerge(FunctionContext* context, const IntVal& src=
, IntVal* dst) {
     dst->val +=3D src.val;
}

IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&=
; val) {
     return val;
}

=C2=A0

=

We are able to bu= ild and create an aggregate function in impala, but when trying to run the = select query similar to the one above, it is bringing down couple of impala= deamons and throwing the error below and getting terminated.

=C2=A0

WARNINGS: Cancelled due to unreachable impalad(s): ha= doop102.**.**.**.com:22000

=C2=A0

=C2=A0

We have impalad running on 14 instances.

=C2=A0

Can someone help resolve us=C2=A0this problem and= a better way to achieve a solution for the scenario explained.




--001a11443626a44a6f05526ba06e--