kudu-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Kanth <ravikanth....@gmail.com>
Subject Re: Spark Streaming + Kudu
Date Wed, 07 Mar 2018 03:14:15 GMT
Mike,

Can you clarify a bit on grabbing the jstack for the process? I launched my
Spark application and tried to get the pid using which I thought I can grab
jstack trace during hang. Unfortunately, I am not able to figure out
grabbing pid for Spark application.

Thanks,
Ravi

On 6 March 2018 at 18:36, Mike Percy <mpercy@apache.org> wrote:

> Thanks Ravi. Would you mind attaching the output of jstack on the process
> during this hang? That would show what the Kudu client threads are doing,
> as what we are seeing here is just the netty boss thread.
>
> Mike
>
> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth <ravikanth.4b0@gmail.com>
> wrote:
>
>>
>> Yes, I have debugged to find the root cause. Every logger before "table
>> = client.openTable(tableName);" is executing fine and exactly at the
>> point of opening the table, it is throwing the below exception and nothing
>> is being executed after that. Still the Spark batches are being processed
>> and at opening the table is failing. I tried catching it with no luck.
>> Please find below the exception.
>>
>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
>> on [id: 0x6e13b01f]
>> java.net.ConnectException: Connection refused:
>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>> .java:717)
>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Thanks,
>> Ravi
>>
>> On 5 March 2018 at 23:52, Mike Percy <mpercy@apache.org> wrote:
>>
>>> Have you considered checking your session error count or pending errors
>>> in your while loop every so often? Can you identify where your code is
>>> hanging when the connection is lost (what line)?
>>>
>>> Mike
>>>
>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>> wrote:
>>>
>>>> In addition to my previous comment, I raised a support ticket for this
>>>> issue with Cloudera and one of the support person mentioned below,
>>>>
>>>> *"Thank you for clarifying, The exceptions are logged but not re-thrown
>>>> to an upper layer, so that explains why the Spark application is not aware
>>>> of the underlying error."*
>>>>
>>>> On 5 March 2018 at 21:02, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
>>>>
>>>>> Mike,
>>>>>
>>>>> Thanks for the information. But, once the connection to any of the
>>>>> Kudu servers is lost then there is no way I can have a control on the
>>>>> KuduSession object and so with getPendingErrors(). The KuduClient in
this
>>>>> case is becoming a zombie and never returned back till the connection
is
>>>>> properly established. I tried doing all that you have suggested with
no
>>>>> luck. Attaching my KuduClient code.
>>>>>
>>>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>>>
>>>>> import java.util.HashMap;
>>>>> import java.util.Iterator;
>>>>> import java.util.Map;
>>>>> import org.apache.hadoop.util.ShutdownHookManager;
>>>>> import org.apache.kudu.client.*;
>>>>> import org.apache.spark.api.java.JavaRDD;
>>>>> import org.slf4j.Logger;
>>>>> import org.slf4j.LoggerFactory;
>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>>>> ullConstants;
>>>>>
>>>>> public class KuduProcess {
>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>>>> ocess.class);
>>>>> private KuduTable table;
>>>>> private KuduSession session;
>>>>>
>>>>> public static void upsertKudu(JavaRDD<Map<String, Object>>
rdd, String
>>>>> host, String tableName) {
>>>>> rdd.foreachPartition(iterator -> {
>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>>>> tableName, host);
>>>>> int errorCount = errors.getRowErrors().length;
>>>>> if(errorCount > 0){
>>>>> throw new RuntimeException("Failed to write " + errorCount + "
>>>>> messages into Kudu");
>>>>> }
>>>>> });
>>>>> }
>>>>> private static RowErrorsAndOverflowStatus
>>>>> upsertOpIterator(Iterator<Map<String, Object>> iter, String
>>>>> tableName, String host) {
>>>>> try {
>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>>>> KuduClient client = asyncClient.syncClient();
>>>>> table = client.openTable(tableName);
>>>>> session = client.newSession();
>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>>>> SH_BACKGROUND);
>>>>> while (iter.hasNext()) {
>>>>> upsertOp(iter.next());
>>>>> }
>>>>> } catch (KuduException e) {
>>>>> logger.error("Exception in upsertOpIterator method", e);
>>>>> }
>>>>> finally{
>>>>> try {
>>>>> session.close();
>>>>> } catch (KuduException e) {
>>>>> logger.error("Exception in Connection close", e);
>>>>> }
>>>>> }
>>>>> return session.getPendingErrors();        --------------------->
>>>>> Once, the connection is lost, this part of the code never gets called
and
>>>>> the Spark job will keep on running and processing the records while
>>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing
all
>>>>> the records.
>>>>> }
>>>>> public static void upsertOp(Map<String, Object> formattedMap) {
>>>>> if (formattedMap.size() != 0) {
>>>>> try {
>>>>> Upsert upsert = table.newUpsert();
>>>>> PartialRow row = upsert.getRow();
>>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet())
{
>>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>>>> row.setNull(entry.getKey());
>>>>> else
>>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>>> row.setNull(entry.getKey());
>>>>> else
>>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>>> row.setNull(entry.getKey());
>>>>> else
>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>>>> }
>>>>> }
>>>>>
>>>>> session.apply(upsert);
>>>>> } catch (Exception e) {
>>>>> logger.error("Exception during upsert:", e);
>>>>> }
>>>>> }
>>>>> }
>>>>> }
>>>>> class KuduConnection {
>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>>>> nnection.class);
>>>>> private static Map<String, AsyncKuduClient> asyncCache = new
>>>>> HashMap<>();
>>>>> private static int ShutdownHookPriority = 100;
>>>>>
>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>>>>> if (!asyncCache.containsKey(kuduMaster)) {
>>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>>>>> tBuilder(kuduMaster).build();
>>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>>>>> @Override
>>>>> public void run() {
>>>>> try {
>>>>> asyncClient.close();
>>>>> } catch (Exception e) {
>>>>> logger.error("Exception closing async client", e);
>>>>> }
>>>>> }
>>>>> }, ShutdownHookPriority);
>>>>> asyncCache.put(kuduMaster, asyncClient);
>>>>> }
>>>>> return asyncCache.get(kuduMaster);
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Ravi
>>>>>
>>>>> On 5 March 2018 at 16:20, Mike Percy <mpercy@apache.org> wrote:
>>>>>
>>>>>> Hi Ravi, it would be helpful if you could attach what you are getting
>>>>>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>>>>>> from items in the returned array -- and indicate what you were hoping
to
>>>>>> get back. Note that a RowError can also return to you the Operation
>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>>>>> that you used to generate the write. From the Operation, you can
get the
>>>>>> original PartialRow
>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>>>>> object, which should be able to identify the affected row that the
write
>>>>>> failed for. Does that help?
>>>>>>
>>>>>> Since you are using the Kudu client directly, Spark is not involved
>>>>>> from the Kudu perspective, so you will need to deal with Spark on
your own
>>>>>> in that case.
>>>>>>
>>>>>> Mike
>>>>>>
>>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Mike,
>>>>>>>
>>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>>>>>
>>>>>>> So, I am trying to use Kudu Client API to perform UPSERT into
Kudu
>>>>>>> and I integrated this with Spark. I am trying to test a case
where in if
>>>>>>> any of Kudu server fails. So, in this case, if there is any problem
in
>>>>>>> writing, getPendingErrors() should give me a way to handle these
errors so
>>>>>>> that I can successfully terminate my Spark Job. This is what
I am trying to
>>>>>>> do.
>>>>>>>
>>>>>>> But, I am not able to get a hold of the exceptions being thrown
from
>>>>>>> with in the KuduClient when retrying to connect to Tablet Server.
My
>>>>>>> getPendingErrors is not getting ahold of these exceptions.
>>>>>>>
>>>>>>> Let me know if you need more clarification. I can post some Snippets.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ravi
>>>>>>>
>>>>>>> On 5 March 2018 at 13:18, Mike Percy <mpercy@apache.org>
wrote:
>>>>>>>
>>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>>>>>> You mention that you are trying to use getPendingErrors()
>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
but
>>>>>>>> it sounds like it's not working for you -- can you be more
specific about
>>>>>>>> what you expect and what you are observing?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Mike
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <
>>>>>>>> ravikanth.4b0@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank Clifford. We are running Kudu 1.4 version. Till
date we
>>>>>>>>> didn't see any issues in production and we are not losing
tablet servers.
>>>>>>>>> But, as part of testing I have to generate few unforeseen
cases to analyse
>>>>>>>>> the application performance. One among that is bringing
down the tablet
>>>>>>>>> server or master server intentionally during which I
observed the loss of
>>>>>>>>> records. Just wanted to test cases out of the happy path
here. Once again
>>>>>>>>> thanks for taking time to respond to me.
>>>>>>>>>
>>>>>>>>> - Ravi
>>>>>>>>>
>>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick <
>>>>>>>>> cresnick@mediamath.com> wrote:
>>>>>>>>>
>>>>>>>>>> I'll have to get back to you on the code bits, but
I'm pretty
>>>>>>>>>> sure we're doing simple sync batching. We're not
in production yet, but
>>>>>>>>>> after some months of development I haven't seen any
failures, even when
>>>>>>>>>> pushing load doing multiple years' backfill. I think
the real question is
>>>>>>>>>> why are you losing tablet servers? The only instability
we ever had with
>>>>>>>>>> Kudu was when it had that weird ntp sync issue that
was fixed I think for
>>>>>>>>>> 1.6. What version are you running?
>>>>>>>>>>
>>>>>>>>>> Anyway I would think that infinite loop should be
catchable
>>>>>>>>>> somewhere. Our pipeline is set to fail/retry with
Flink snapshots. I
>>>>>>>>>> imagine there is similar with Spark. Sorry I cant
be of more help!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Cliff,
>>>>>>>>>>
>>>>>>>>>> Thanks for the response. Well, I do agree that its
simple and
>>>>>>>>>> seamless. In my case, I am able to upsert ~25000
events/sec into Kudu. But,
>>>>>>>>>> I am facing the problem when any of the Kudu Tablet
or master server is
>>>>>>>>>> down. I am not able to get a hold of the exception
from client. The client
>>>>>>>>>> is going into an infinite loop trying to connect
to Kudu. Meanwhile, I am
>>>>>>>>>> loosing my records. I tried handling the errors through
getPendingErrors()
>>>>>>>>>> but still it is helpless. I am using AsyncKuduClient
to establish the
>>>>>>>>>> connection and retrieving the syncClient from the
Async to open the session
>>>>>>>>>> and table. Any help?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Ravi
>>>>>>>>>>
>>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cresny@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> While I can't speak for Spark, we do use the client
API from
>>>>>>>>>> Flink streaming and it's simple and seamless. It's
especially nice if you
>>>>>>>>>> require an Upsert semantic.
>>>>>>>>>>
>>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth.4b0@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Anyone using Spark Streaming to ingest data into
Kudu and using
>>>>>>>>>> Kudu Client API to do so rather than the traditional
KuduContext API? I am
>>>>>>>>>> stuck at a point and couldn't find a solution.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Ravi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message