From user-return-1288-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Wed Mar 7 05:42:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 92812180652 for ; Wed, 7 Mar 2018 05:42:50 +0100 (CET) Received: (qmail 46769 invoked by uid 500); 7 Mar 2018 04:42:49 -0000 Mailing-List: contact user-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@kudu.apache.org Delivered-To: mailing list user@kudu.apache.org Received: (qmail 46759 invoked by uid 99); 7 Mar 2018 04:42:49 -0000 Received: from mail-relay.apache.org (HELO mailrelay2-lw-us.apache.org) (207.244.88.137) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Mar 2018 04:42:49 +0000 Received: from mail-it0-f47.google.com (mail-it0-f47.google.com [209.85.214.47]) by mailrelay2-lw-us.apache.org (ASF Mail Server at mailrelay2-lw-us.apache.org) with ESMTPSA id 184FEF26 for ; Wed, 7 Mar 2018 04:42:47 +0000 (UTC) Received: by mail-it0-f47.google.com with SMTP id d13so1698465itf.0 for ; Tue, 06 Mar 2018 20:42:47 -0800 (PST) X-Gm-Message-State: AElRT7ElqOM/I2QkcOe0Dr/WhlIDMv0eU2rKnhkOJ35vLwN1hDEFIDzl eGr/2OZAz2rOg5qViCjVpM4kx/fFvmXn6F6AcV8= X-Google-Smtp-Source: AG47ELuXRX7Wf5w4GVZGQLGFObtm4Fsk8QBmtYnVxVI3fLrWzg0xDkFFydPjTQgW9Hheow2e6NPJerwpQER2PiEZM3c= X-Received: by 10.36.212.3 with SMTP id x3mr18760149itg.22.1520397767054; Tue, 06 Mar 2018 20:42:47 -0800 (PST) MIME-Version: 1.0 Received: by 10.2.151.176 with HTTP; Tue, 6 Mar 2018 20:42:06 -0800 (PST) In-Reply-To: References: <8cf65af9-47c0-421f-a71c-a314fc3443f7@email.android.com> From: Mike Percy Date: Tue, 6 Mar 2018 20:42:06 -0800 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Spark Streaming + Kudu To: user@kudu.apache.org Content-Type: multipart/alternative; boundary="94eb2c0b19ced29d5a0566cb306f" --94eb2c0b19ced29d5a0566cb306f Content-Type: text/plain; charset="UTF-8" Hmm, could you try in spark local mode? i.e. https://jaceklaskowski. gitbooks.io/mastering-apache-spark/content/spark-local.html Mike On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth wrote: > Mike, > > Can you clarify a bit on grabbing the jstack for the process? I launched > my Spark application and tried to get the pid using which I thought I can > grab jstack trace during hang. Unfortunately, I am not able to figure out > grabbing pid for Spark application. > > Thanks, > Ravi > > On 6 March 2018 at 18:36, Mike Percy wrote: > >> Thanks Ravi. Would you mind attaching the output of jstack on the process >> during this hang? That would show what the Kudu client threads are doing, >> as what we are seeing here is just the netty boss thread. >> >> Mike >> >> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth >> wrote: >> >>> >>> Yes, I have debugged to find the root cause. Every logger before "table >>> = client.openTable(tableName);" is executing fine and exactly at the >>> point of opening the table, it is throwing the below exception and nothing >>> is being executed after that. Still the Spark batches are being processed >>> and at opening the table is failing. I tried catching it with no luck. >>> Please find below the exception. >>> >>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer >>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream >>> on [id: 0x6e13b01f] >>> java.net.ConnectException: Connection refused: >>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050 >>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl >>> .java:717) >>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>> .nio.NioClientBoss.connect(NioClientBoss.java:152) >>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) >>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>> .nio.NioClientBoss.process(NioClientBoss.java:79) >>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337) >>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>> .nio.NioClientBoss.run(NioClientBoss.java:42) >>> at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen >>> amingRunnable.run(ThreadRenamingRunnable.java:108) >>> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal. >>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1142) >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> Thanks, >>> Ravi >>> >>> On 5 March 2018 at 23:52, Mike Percy wrote: >>> >>>> Have you considered checking your session error count or pending errors >>>> in your while loop every so often? Can you identify where your code is >>>> hanging when the connection is lost (what line)? >>>> >>>> Mike >>>> >>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth >>>> wrote: >>>> >>>>> In addition to my previous comment, I raised a support ticket for this >>>>> issue with Cloudera and one of the support person mentioned below, >>>>> >>>>> *"Thank you for clarifying, The exceptions are logged but not >>>>> re-thrown to an upper layer, so that explains why the Spark application is >>>>> not aware of the underlying error."* >>>>> >>>>> On 5 March 2018 at 21:02, Ravi Kanth wrote: >>>>> >>>>>> Mike, >>>>>> >>>>>> Thanks for the information. But, once the connection to any of the >>>>>> Kudu servers is lost then there is no way I can have a control on the >>>>>> KuduSession object and so with getPendingErrors(). The KuduClient in this >>>>>> case is becoming a zombie and never returned back till the connection is >>>>>> properly established. I tried doing all that you have suggested with no >>>>>> luck. Attaching my KuduClient code. >>>>>> >>>>>> package org.dwh.streaming.kudu.sparkkudustreaming; >>>>>> >>>>>> import java.util.HashMap; >>>>>> import java.util.Iterator; >>>>>> import java.util.Map; >>>>>> import org.apache.hadoop.util.ShutdownHookManager; >>>>>> import org.apache.kudu.client.*; >>>>>> import org.apache.spark.api.java.JavaRDD; >>>>>> import org.slf4j.Logger; >>>>>> import org.slf4j.LoggerFactory; >>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN >>>>>> ullConstants; >>>>>> >>>>>> public class KuduProcess { >>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr >>>>>> ocess.class); >>>>>> private KuduTable table; >>>>>> private KuduSession session; >>>>>> >>>>>> public static void upsertKudu(JavaRDD> rdd, >>>>>> String host, String tableName) { >>>>>> rdd.foreachPartition(iterator -> { >>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, >>>>>> tableName, host); >>>>>> int errorCount = errors.getRowErrors().length; >>>>>> if(errorCount > 0){ >>>>>> throw new RuntimeException("Failed to write " + errorCount + " >>>>>> messages into Kudu"); >>>>>> } >>>>>> }); >>>>>> } >>>>>> private static RowErrorsAndOverflowStatus >>>>>> upsertOpIterator(Iterator> iter, String >>>>>> tableName, String host) { >>>>>> try { >>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host); >>>>>> KuduClient client = asyncClient.syncClient(); >>>>>> table = client.openTable(tableName); >>>>>> session = client.newSession(); >>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU >>>>>> SH_BACKGROUND); >>>>>> while (iter.hasNext()) { >>>>>> upsertOp(iter.next()); >>>>>> } >>>>>> } catch (KuduException e) { >>>>>> logger.error("Exception in upsertOpIterator method", e); >>>>>> } >>>>>> finally{ >>>>>> try { >>>>>> session.close(); >>>>>> } catch (KuduException e) { >>>>>> logger.error("Exception in Connection close", e); >>>>>> } >>>>>> } >>>>>> return session.getPendingErrors(); ---------------------> >>>>>> Once, the connection is lost, this part of the code never gets called and >>>>>> the Spark job will keep on running and processing the records while >>>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all >>>>>> the records. >>>>>> } >>>>>> public static void upsertOp(Map formattedMap) { >>>>>> if (formattedMap.size() != 0) { >>>>>> try { >>>>>> Upsert upsert = table.newUpsert(); >>>>>> PartialRow row = upsert.getRow(); >>>>>> for (Map.Entry entry : formattedMap.entrySet()) { >>>>>> if (entry.getValue().getClass().equals(String.class)) { >>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull)) >>>>>> row.setNull(entry.getKey()); >>>>>> else >>>>>> row.addString(entry.getKey(), (String) entry.getValue()); >>>>>> } else if (entry.getValue().getClass().equals(Long.class)) { >>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull)) >>>>>> row.setNull(entry.getKey()); >>>>>> else >>>>>> row.addLong(entry.getKey(), (Long) entry.getValue()); >>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) { >>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull)) >>>>>> row.setNull(entry.getKey()); >>>>>> else >>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue()); >>>>>> } >>>>>> } >>>>>> >>>>>> session.apply(upsert); >>>>>> } catch (Exception e) { >>>>>> logger.error("Exception during upsert:", e); >>>>>> } >>>>>> } >>>>>> } >>>>>> } >>>>>> class KuduConnection { >>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo >>>>>> nnection.class); >>>>>> private static Map asyncCache = new >>>>>> HashMap<>(); >>>>>> private static int ShutdownHookPriority = 100; >>>>>> >>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) { >>>>>> if (!asyncCache.containsKey(kuduMaster)) { >>>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien >>>>>> tBuilder(kuduMaster).build(); >>>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() { >>>>>> @Override >>>>>> public void run() { >>>>>> try { >>>>>> asyncClient.close(); >>>>>> } catch (Exception e) { >>>>>> logger.error("Exception closing async client", e); >>>>>> } >>>>>> } >>>>>> }, ShutdownHookPriority); >>>>>> asyncCache.put(kuduMaster, asyncClient); >>>>>> } >>>>>> return asyncCache.get(kuduMaster); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Ravi >>>>>> >>>>>> On 5 March 2018 at 16:20, Mike Percy wrote: >>>>>> >>>>>>> Hi Ravi, it would be helpful if you could attach what you are >>>>>>> getting back from getPendingErrors() -- perhaps from dumping >>>>>>> RowError.toString() from items in the returned array -- and indicate what >>>>>>> you were hoping to get back. Note that a RowError can also return to you >>>>>>> the Operation >>>>>>> >>>>>>> that you used to generate the write. From the Operation, you can get the >>>>>>> original PartialRow >>>>>>> >>>>>>> object, which should be able to identify the affected row that the write >>>>>>> failed for. Does that help? >>>>>>> >>>>>>> Since you are using the Kudu client directly, Spark is not involved >>>>>>> from the Kudu perspective, so you will need to deal with Spark on your own >>>>>>> in that case. >>>>>>> >>>>>>> Mike >>>>>>> >>>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Mike, >>>>>>>> >>>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND. >>>>>>>> >>>>>>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu >>>>>>>> and I integrated this with Spark. I am trying to test a case where in if >>>>>>>> any of Kudu server fails. So, in this case, if there is any problem in >>>>>>>> writing, getPendingErrors() should give me a way to handle these errors so >>>>>>>> that I can successfully terminate my Spark Job. This is what I am trying to >>>>>>>> do. >>>>>>>> >>>>>>>> But, I am not able to get a hold of the exceptions being thrown >>>>>>>> from with in the KuduClient when retrying to connect to Tablet Server. My >>>>>>>> getPendingErrors is not getting ahold of these exceptions. >>>>>>>> >>>>>>>> Let me know if you need more clarification. I can post some >>>>>>>> Snippets. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Ravi >>>>>>>> >>>>>>>> On 5 March 2018 at 13:18, Mike Percy wrote: >>>>>>>> >>>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND >>>>>>>>> ? >>>>>>>>> You mention that you are trying to use getPendingErrors() >>>>>>>>> but >>>>>>>>> it sounds like it's not working for you -- can you be more specific about >>>>>>>>> what you expect and what you are observing? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Mike >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth < >>>>>>>>> ravikanth.4b0@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we >>>>>>>>>> didn't see any issues in production and we are not losing tablet servers. >>>>>>>>>> But, as part of testing I have to generate few unforeseen cases to analyse >>>>>>>>>> the application performance. One among that is bringing down the tablet >>>>>>>>>> server or master server intentionally during which I observed the loss of >>>>>>>>>> records. Just wanted to test cases out of the happy path here. Once again >>>>>>>>>> thanks for taking time to respond to me. >>>>>>>>>> >>>>>>>>>> - Ravi >>>>>>>>>> >>>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick < >>>>>>>>>> cresnick@mediamath.com> wrote: >>>>>>>>>> >>>>>>>>>>> I'll have to get back to you on the code bits, but I'm pretty >>>>>>>>>>> sure we're doing simple sync batching. We're not in production yet, but >>>>>>>>>>> after some months of development I haven't seen any failures, even when >>>>>>>>>>> pushing load doing multiple years' backfill. I think the real question is >>>>>>>>>>> why are you losing tablet servers? The only instability we ever had with >>>>>>>>>>> Kudu was when it had that weird ntp sync issue that was fixed I think for >>>>>>>>>>> 1.6. What version are you running? >>>>>>>>>>> >>>>>>>>>>> Anyway I would think that infinite loop should be catchable >>>>>>>>>>> somewhere. Our pipeline is set to fail/retry with Flink snapshots. I >>>>>>>>>>> imagine there is similar with Spark. Sorry I cant be of more help! >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Cliff, >>>>>>>>>>> >>>>>>>>>>> Thanks for the response. Well, I do agree that its simple and >>>>>>>>>>> seamless. In my case, I am able to upsert ~25000 events/sec into Kudu. But, >>>>>>>>>>> I am facing the problem when any of the Kudu Tablet or master server is >>>>>>>>>>> down. I am not able to get a hold of the exception from client. The client >>>>>>>>>>> is going into an infinite loop trying to connect to Kudu. Meanwhile, I am >>>>>>>>>>> loosing my records. I tried handling the errors through getPendingErrors() >>>>>>>>>>> but still it is helpless. I am using AsyncKuduClient to establish the >>>>>>>>>>> connection and retrieving the syncClient from the Async to open the session >>>>>>>>>>> and table. Any help? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Ravi >>>>>>>>>>> >>>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> While I can't speak for Spark, we do use the client API from >>>>>>>>>>> Flink streaming and it's simple and seamless. It's especially nice if you >>>>>>>>>>> require an Upsert semantic. >>>>>>>>>>> >>>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Anyone using Spark Streaming to ingest data into Kudu and using >>>>>>>>>>> Kudu Client API to do so rather than the traditional KuduContext API? I am >>>>>>>>>>> stuck at a point and couldn't find a solution. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Ravi >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > --94eb2c0b19ced29d5a0566cb306f Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hmm, could you try in spark local mode? i.e.=C2=A0https://jaceklaskowski.gitbooks.io/mast= ering-apache-spark/content/spark-local.html

Mike

On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kant= h <ravikanth.4b0@gmail.com> wrote:
Mike,

Can you clarify = a bit on grabbing the jstack for the process? I launched my Spark applicati= on and tried to get the pid using which I thought I can grab jstack trace d= uring hang. Unfortunately, I am not able to figure out grabbing pid for Spa= rk application.

Thanks,
Ravi
=

On 6 Mar= ch 2018 at 18:36, Mike Percy <mpercy@apache.org> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
Thanks Ravi. Would you mind = attaching the output of jstack on the process during this hang? That would = show what the Kudu client threads are doing, as what we are seeing here is = just the netty boss thread.

Mike

On Tue, Mar 6, 2018 a= t 8:52 AM, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:=

Yes= , I have debugged to find the root cause. Every logger before "table =3D client.openTable(tableNam= e);" is executing fine and exactly at the point of opening the = table, it is throwing the below exception and nothing is being executed aft= er that. Still the Spark batches are being processed and at opening the tab= le is failing. I tried catching it with no luck. Please find below the exce= ption.

8/02/23 00:16:30 ERROR client.TabletCl= ient: [Peer bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception fro= m downstream on [id: 0x6e13b01f]
java.net.ConnectException: Conne= ction refused: kudu102.dev.sac.int.threatmetrix.com/10= .112.3.12:7050
=C2=A0 =C2=A0 at sun.nio.ch.SocketChannelImpl.= checkConnect(Native Method)
=C2=A0 =C2=A0 at sun.nio.ch.Sock= etChannelImpl.finishConnect(SocketChannelImpl.java:717)
=C2=A0 =C2=A0 at org.apache.kudu.client.shaded.org.jboss.netty.channe= l.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
<= div>=C2=A0 =C2=A0 at org.apache.kudu.client.shaded.org.jboss.netty.cha= nnel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.j= ava:105)
=C2=A0 =C2=A0 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClie= ntBoss.java:79)
=C2=A0 =C2=A0 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(Abs= tractNioSelector.java:337)
=C2=A0 =C2=A0 at org.apache.kudu.clien= t.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(Nio= ClientBoss.java:42)
=C2=A0 =C2=A0 at org.apache.kudu.client.= shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRena= mingRunnable.java:108)
=C2=A0 =C2=A0 at org.apache.kudu.clie= nt.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run= (DeadLockProofWorker.java:42)
=C2=A0 =C2=A0 at java.util.con= current.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142= )
=C2=A0 =C2=A0 at java.util.concurrent.ThreadPoolExecutor$W= orker.run(ThreadPoolExecutor.java:617)
=C2=A0 =C2=A0 at java= .lang.Thread.run(Thread.java:745)


<= /div>
Thanks,
Ravi

On 5 March 2018 at 23:52, M= ike Percy <mpercy@apache.org> wrote:
Have you considered checking your session error= count or pending errors in your while loop every so often? Can you identif= y where your code is hanging when the connection is lost (what line)?

Mi= ke

On Mon, Mar 5, 2018 at 9:08 PM, Rav= i Kanth <ravikanth.4b0@gmail.com> wrote:
In addition to my previous comment, I= raised a support ticket for this issue with Cloudera and one of the suppor= t person mentioned below,

"Thank you for clarifying, The exceptions are logged but n= ot re-thrown to an upper layer, so that explains why the Spark application = is not aware of the underlying error."

On 5 March 2018 at 21:02= , Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
Mike,=C2=A0

T= hanks for the information. But, once the connection to any of the Kudu serv= ers is lost then there is no way I can have a control on the KuduSession ob= ject and so with getPendingErrors(). The KuduClient in this case is becomin= g a zombie and never returned back till the connection is properly establis= hed. I tried doing all that you have suggested with no luck. Attaching my K= uduClient code.

package org.dwh.streaming.kud= u.sparkkudustreaming;

import java.util.HashMa= p;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.util.ShutdownHookManager;
im= port org.apache.kudu.client.*;
import org.apache.spark.api.java.J= avaRDD;
import org.slf4j.Logger;
import org.slf4j.= LoggerFactory;
import org.dwh.streaming.kudu.sparkkudustream= ing.constants.SpecialNullConstants;

public cl= ass KuduProcess {
private static Logger log= ger =3D LoggerFactory.getLogger(KuduProcess.class);
private KuduTable table;
priv= ate KuduSession session;

pub= lic static void upsertKudu(JavaRDD<Map<String, Object>> rdd, St= ring host, String tableName) {
rdd.foreac= hPartition(iterator -> {
RowErrorsAnd= OverflowStatus errors =3D upsertOpIterator(iterator, tableName, host);
int errorCount =3D errors.getRowErrors().len= gth;
if(errorCount > 0){
throw new RuntimeException("Failed to write &q= uot; + errorCount + " messages into Kudu");
}
});
}
private static RowErrorsAndOverflowStatus upsertOpIterator(Iterator<M= ap<String, Object>> iter, String tableName, String host) {
try {
Asyn= cKuduClient asyncClient =3D KuduConnection.getAsyncClient(host);
=
KuduClient client =3D asyncClient.syncClient()= ;
table =3D client.openTable(tableName);<= /div>
session =3D client.newSession();
<= div> session.setFlushMode(SessionConfiguration.= FlushMode.AUTO_FLUSH_BACKGROUND);
wh= ile (iter.hasNext()) {
upsertOp(iter.nex= t());
}
= } catch (KuduException e) {
logger.error(= "Exception in upsertOpIterator method", e);
}
finally{
try {
session.close();
} catch (KuduException e) {
logger.error("Exception in Connection close",= e);
}
}=
return session.getPendingErrors(); =C2=A0=C2=A0 =C2=A0 =C2=A0=C2=A0------------------= ---> Once, the connection is lost, this part of the code never gets call= ed and the Spark job will=C2=A0keep on running and=C2=A0processing t= he records while the KuduClient is trying to connect to Kudu. Meanwhile, we= are loosing all the records.
}
<= span class=3D"m_-6283129410843123603m_-3212254570011668864m_779561873420207= 9044m_-8346886424083693801m_6486822345494282471m_3141261326382143034m_87606= 4196101486489m_-7297442341382116041gmail-Apple-tab-span" style=3D"white-spa= ce:pre-wrap">
public static void up= sertOp(Map<String, Object> formattedMap) {
= if (formattedMap.size() !=3D 0) {
= try {
Upsert upsert =3D table.newUpsert(= );
PartialRow row =3D upsert.getRow();
for (Map.Entry<String, Object> entr= y : formattedMap.entrySet()) {
if (entr= y.getValue().getClass().equals(String.class)) {
if (entry.getValue().equals(SpecialNullConstants.speci= alStringNull))
row.setNull(entry= .getKey());
else
row.addString(entry.getKey(), (String) entry.getValue());<= /div>
} else if (entry.getValue().getClass(= ).equals(Long.class)) {
if (entry= .getValue().equals(SpecialNullConstants.specialLongNull))
row.setNull(entry.getKey());
else
row.addLon= g(entry.getKey(), (Long) entry.getValue());
} else if (entry.getValue().getClass().equals(Integer.class)) {
if (entry.getValue().equals(Specia= lNullConstants.specialIntNull))
= row.setNull(entry.getKey());
else
row.addInt(entry.getKey(), (Integer) entr= y.getValue());
}
}

session.a= pply(upsert);
} catch (Exception e) {
logger.error("Exception during upsert:= ", e);
}
<= /span>}
}
<= /div>
}
class KuduConnection= {
private static Logger logger =3D LoggerF= actory.getLogger(KuduConnection.class);
private static Map<String, AsyncKuduClient> asyncCache =3D new Hash= Map<>();
private static int ShutdownH= ookPriority =3D 100;

static = AsyncKuduClient getAsyncClient(String kuduMaster) {
if (!asyncCache.containsKey(kuduMaster)) {
AsyncKuduClient asyncClient =3D new AsyncKuduClient.Asyn= cKuduClientBuilder(kuduMaster).build();
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
pu= blic void run() {
try {
asyncClient.close();
} catch (Exception e) {
logger.err= or("Exception closing async client", e);
= }
}
= }, ShutdownHookPriority);
async= Cache.put(kuduMaster, asyncClient);
}
return asyncCache.get(kuduMaster);
<= span class=3D"m_-6283129410843123603m_-3212254570011668864m_779561873420207= 9044m_-8346886424083693801m_6486822345494282471m_3141261326382143034m_87606= 4196101486489m_-7297442341382116041gmail-Apple-tab-span" style=3D"white-spa= ce:pre-wrap"> }
}



Thanks,
Ravi

On 5 March 2018 at 16:20, Mike Percy <mpercy@apache.org> wr= ote:
Hi Ravi, it would b= e helpful if you could attach what you are getting back from getPendingErro= rs() -- perhaps from dumping RowError.toString() from items in the returned= array -- and indicate what you were hoping to get back. Note that a RowErr= or can also return to you the Operation that you used to generate the write. From the Operat= ion, you can get the original=C2=A0PartialRow object, which should be able to identify the affected row t= hat the write failed for. Does that help?

Since you are = using the Kudu client directly, Spark is not involved from the Kudu perspec= tive, so you will need to deal with Spark on your own in that case.

=
Mike
=
On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <= span dir=3D"ltr"><ravikanth.4b0@gmail.com> wrote:
Hi Mike,

Thanks for the r= eply. Yes, I am using AUTO_FLUSH_BACKGROUND.=C2=A0

So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I i= ntegrated this with Spark. I am trying to test a case where in if any of Ku= du server fails. So, in this case, if there is any problem in writing, getP= endingErrors() should give me a way to handle these errors so that I can su= ccessfully terminate my Spark Job. This is what I am trying to do.

But, I am not able to get a hold of the exceptions being t= hrown from with in the KuduClient when retrying to connect to Tablet Server= . My getPendingErrors is not getting ahold of these exceptions.
<= br>
Let me know if you need more clarification. I can post some S= nippets.

Thanks,
Ravi

=
On 5 March 2018 at 13:18, Mike Percy <mpercy@= apache.org> wrote:
Hi Ravi, are you using AUTO_FLUSH_BACKGROUND? You mention that you are tryin= g to use=C2=A0= getPendingErrors()=C2=A0but it sounds like it's not working for you= -- can you be more specific about what you expect and what you are observi= ng?

Thanks,
Mike

=C2=A0= =C2=A0

On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
Thank Clifford. We are running Kudu 1.4 version. Ti= ll date we didn't see any issues in production and we are not losing ta= blet servers. But, as part of testing I have to generate few unforeseen cas= es to analyse the application performance. One among that is bringing down = the tablet server or master server intentionally during which I observed th= e loss of records. Just wanted to test cases out of the happy path here. On= ce again thanks for taking time to respond to me.=C2=A0

- Ravi
<= div class=3D"gmail_extra">
On 26 February 201= 8 at 19:58, Clifford Resnick <cresnick@mediamath.com> w= rote:
I'll have to get back to you on the code bits, but I&= #39;m pretty sure we're doing simple sync batching. We're not in pr= oduction yet, but after some months of development I haven't seen any f= ailures, even when pushing load doing multiple years' backfill. I think the real question is why are you losing tablet servers? = The only instability we ever had with Kudu was when it had that weird ntp s= ync issue that was fixed I think for 1.6.=C2=A0What version are you running?=C2=A0

Anyway I would think that infinite loop should be catchab= le somewhere. Our pipeline is set to fail/retry with Flink snapshots. I ima= gine there is similar with Spark. Sorry I cant be of more help!
<= div class=3D"m_-6283129410843123603m_-3212254570011668864m_7795618734202079= 044m_-8346886424083693801m_6486822345494282471m_3141261326382143034m_876064= 196101486489m_-7297442341382116041m_-5427319935322750979m_45364001536083102= 77m_6228390485167214013m_1281330844382541101h5">



On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
Cliff,

Thanks for the response. Well, I do agree that its simple and seamless= . In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am fa= cing the problem when any of the Kudu Tablet or master server is down. I am= not able to get a hold of the exception from client. The client is going into an infinite loop trying to connect t= o Kudu. Meanwhile, I am loosing my records. I tried handling the errors thr= ough getPendingErrors() but still it is helpless. I am using AsyncKuduClien= t to establish the connection and retrieving the syncClient from the Async to open the session and table. An= y help?=C2=A0

Thanks,
Ravi

On 26 February 2018 at 18:00, Cliff Resnick <cresny@gmail.com><= /span> wrote:
While I can't speak for Spark, we do use the client A= PI from Flink streaming and it's simple and seamless. It's especial= ly nice if you require an Upsert semantic.

On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth.4b0@gmail.com> = wrote:
Hi,

Anyone using Spark Streaming to ingest data into Kudu and using Kudu C= lient API to do so rather than the traditional KuduContext API? I am stuck = at a point and couldn't find a solution.=C2=A0

Thanks,
Ravi













--94eb2c0b19ced29d5a0566cb306f--