kudu-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Percy <mpe...@apache.org>
Subject Re: Spark Streaming + Kudu
Date Wed, 07 Mar 2018 02:36:22 GMT
Thanks Ravi. Would you mind attaching the output of jstack on the process
during this hang? That would show what the Kudu client threads are doing,
as what we are seeing here is just the netty boss thread.

Mike

On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:

>
> Yes, I have debugged to find the root cause. Every logger before "table =
> client.openTable(tableName);" is executing fine and exactly at the point
> of opening the table, it is throwing the below exception and nothing is
> being executed after that. Still the Spark batches are being processed and
> at opening the table is failing. I tried catching it with no luck. Please
> find below the exception.
>
> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on
> [id: 0x6e13b01f]
> java.net.ConnectException: Connection refused: kudu102.dev.sac.int.
> threatmetrix.com/10.112.3.12:7050
>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>     at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:717)
>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.process(NioClientBoss.java:79)
>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.
> AbstractNioSelector.run(AbstractNioSelector.java:337)
>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.run(NioClientBoss.java:42)
>     at org.apache.kudu.client.shaded.org.jboss.netty.util.
> ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>     at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 23:52, Mike Percy <mpercy@apache.org> wrote:
>
>> Have you considered checking your session error count or pending errors
>> in your while loop every so often? Can you identify where your code is
>> hanging when the connection is lost (what line)?
>>
>> Mike
>>
>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>> wrote:
>>
>>> In addition to my previous comment, I raised a support ticket for this
>>> issue with Cloudera and one of the support person mentioned below,
>>>
>>> *"Thank you for clarifying, The exceptions are logged but not re-thrown
>>> to an upper layer, so that explains why the Spark application is not aware
>>> of the underlying error."*
>>>
>>> On 5 March 2018 at 21:02, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:
>>>
>>>> Mike,
>>>>
>>>> Thanks for the information. But, once the connection to any of the Kudu
>>>> servers is lost then there is no way I can have a control on the
>>>> KuduSession object and so with getPendingErrors(). The KuduClient in this
>>>> case is becoming a zombie and never returned back till the connection is
>>>> properly established. I tried doing all that you have suggested with no
>>>> luck. Attaching my KuduClient code.
>>>>
>>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>>
>>>> import java.util.HashMap;
>>>> import java.util.Iterator;
>>>> import java.util.Map;
>>>> import org.apache.hadoop.util.ShutdownHookManager;
>>>> import org.apache.kudu.client.*;
>>>> import org.apache.spark.api.java.JavaRDD;
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>>> ullConstants;
>>>>
>>>> public class KuduProcess {
>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>>> ocess.class);
>>>> private KuduTable table;
>>>> private KuduSession session;
>>>>
>>>> public static void upsertKudu(JavaRDD<Map<String, Object>> rdd,
String
>>>> host, String tableName) {
>>>> rdd.foreachPartition(iterator -> {
>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>>> tableName, host);
>>>> int errorCount = errors.getRowErrors().length;
>>>> if(errorCount > 0){
>>>> throw new RuntimeException("Failed to write " + errorCount + " messages
>>>> into Kudu");
>>>> }
>>>> });
>>>> }
>>>> private static RowErrorsAndOverflowStatus upsertOpIterator(Iterator<Map<String,
>>>> Object>> iter, String tableName, String host) {
>>>> try {
>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>>> KuduClient client = asyncClient.syncClient();
>>>> table = client.openTable(tableName);
>>>> session = client.newSession();
>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>>> SH_BACKGROUND);
>>>> while (iter.hasNext()) {
>>>> upsertOp(iter.next());
>>>> }
>>>> } catch (KuduException e) {
>>>> logger.error("Exception in upsertOpIterator method", e);
>>>> }
>>>> finally{
>>>> try {
>>>> session.close();
>>>> } catch (KuduException e) {
>>>> logger.error("Exception in Connection close", e);
>>>> }
>>>> }
>>>> return session.getPendingErrors();        ---------------------> Once,
>>>> the connection is lost, this part of the code never gets called and the
>>>> Spark job will keep on running and processing the records while the
>>>> KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the
>>>> records.
>>>> }
>>>> public static void upsertOp(Map<String, Object> formattedMap) {
>>>> if (formattedMap.size() != 0) {
>>>> try {
>>>> Upsert upsert = table.newUpsert();
>>>> PartialRow row = upsert.getRow();
>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) {
>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>>> row.setNull(entry.getKey());
>>>> else
>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>> row.setNull(entry.getKey());
>>>> else
>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>> row.setNull(entry.getKey());
>>>> else
>>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>>> }
>>>> }
>>>>
>>>> session.apply(upsert);
>>>> } catch (Exception e) {
>>>> logger.error("Exception during upsert:", e);
>>>> }
>>>> }
>>>> }
>>>> }
>>>> class KuduConnection {
>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>>> nnection.class);
>>>> private static Map<String, AsyncKuduClient> asyncCache = new
>>>> HashMap<>();
>>>> private static int ShutdownHookPriority = 100;
>>>>
>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>>>> if (!asyncCache.containsKey(kuduMaster)) {
>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>>>> tBuilder(kuduMaster).build();
>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>>>> @Override
>>>> public void run() {
>>>> try {
>>>> asyncClient.close();
>>>> } catch (Exception e) {
>>>> logger.error("Exception closing async client", e);
>>>> }
>>>> }
>>>> }, ShutdownHookPriority);
>>>> asyncCache.put(kuduMaster, asyncClient);
>>>> }
>>>> return asyncCache.get(kuduMaster);
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>> On 5 March 2018 at 16:20, Mike Percy <mpercy@apache.org> wrote:
>>>>
>>>>> Hi Ravi, it would be helpful if you could attach what you are getting
>>>>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>>>>> from items in the returned array -- and indicate what you were hoping
to
>>>>> get back. Note that a RowError can also return to you the Operation
>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>>>> that you used to generate the write. From the Operation, you can get
the
>>>>> original PartialRow
>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>>>> object, which should be able to identify the affected row that the write
>>>>> failed for. Does that help?
>>>>>
>>>>> Since you are using the Kudu client directly, Spark is not involved
>>>>> from the Kudu perspective, so you will need to deal with Spark on your
own
>>>>> in that case.
>>>>>
>>>>> Mike
>>>>>
>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Mike,
>>>>>>
>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>>>>
>>>>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu
>>>>>> and I integrated this with Spark. I am trying to test a case where
in if
>>>>>> any of Kudu server fails. So, in this case, if there is any problem
in
>>>>>> writing, getPendingErrors() should give me a way to handle these
errors so
>>>>>> that I can successfully terminate my Spark Job. This is what I am
trying to
>>>>>> do.
>>>>>>
>>>>>> But, I am not able to get a hold of the exceptions being thrown from
>>>>>> with in the KuduClient when retrying to connect to Tablet Server.
My
>>>>>> getPendingErrors is not getting ahold of these exceptions.
>>>>>>
>>>>>> Let me know if you need more clarification. I can post some Snippets.
>>>>>>
>>>>>> Thanks,
>>>>>> Ravi
>>>>>>
>>>>>> On 5 March 2018 at 13:18, Mike Percy <mpercy@apache.org> wrote:
>>>>>>
>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>>>>> You mention that you are trying to use getPendingErrors()
>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
but
>>>>>>> it sounds like it's not working for you -- can you be more specific
about
>>>>>>> what you expect and what you are observing?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Mike
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <ravikanth.4b0@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date
we
>>>>>>>> didn't see any issues in production and we are not losing
tablet servers.
>>>>>>>> But, as part of testing I have to generate few unforeseen
cases to analyse
>>>>>>>> the application performance. One among that is bringing down
the tablet
>>>>>>>> server or master server intentionally during which I observed
the loss of
>>>>>>>> records. Just wanted to test cases out of the happy path
here. Once again
>>>>>>>> thanks for taking time to respond to me.
>>>>>>>>
>>>>>>>> - Ravi
>>>>>>>>
>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick <
>>>>>>>> cresnick@mediamath.com> wrote:
>>>>>>>>
>>>>>>>>> I'll have to get back to you on the code bits, but I'm
pretty sure
>>>>>>>>> we're doing simple sync batching. We're not in production
yet, but after
>>>>>>>>> some months of development I haven't seen any failures,
even when pushing
>>>>>>>>> load doing multiple years' backfill. I think the real
question is why are
>>>>>>>>> you losing tablet servers? The only instability we ever
had with Kudu was
>>>>>>>>> when it had that weird ntp sync issue that was fixed
I think for 1.6. What
>>>>>>>>> version are you running?
>>>>>>>>>
>>>>>>>>> Anyway I would think that infinite loop should be catchable
>>>>>>>>> somewhere. Our pipeline is set to fail/retry with Flink
snapshots. I
>>>>>>>>> imagine there is similar with Spark. Sorry I cant be
of more help!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Cliff,
>>>>>>>>>
>>>>>>>>> Thanks for the response. Well, I do agree that its simple
and
>>>>>>>>> seamless. In my case, I am able to upsert ~25000 events/sec
into Kudu. But,
>>>>>>>>> I am facing the problem when any of the Kudu Tablet or
master server is
>>>>>>>>> down. I am not able to get a hold of the exception from
client. The client
>>>>>>>>> is going into an infinite loop trying to connect to Kudu.
Meanwhile, I am
>>>>>>>>> loosing my records. I tried handling the errors through
getPendingErrors()
>>>>>>>>> but still it is helpless. I am using AsyncKuduClient
to establish the
>>>>>>>>> connection and retrieving the syncClient from the Async
to open the session
>>>>>>>>> and table. Any help?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Ravi
>>>>>>>>>
>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cresny@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> While I can't speak for Spark, we do use the client API
from Flink
>>>>>>>>> streaming and it's simple and seamless. It's especially
nice if you require
>>>>>>>>> an Upsert semantic.
>>>>>>>>>
>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth.4b0@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Anyone using Spark Streaming to ingest data into Kudu
and using
>>>>>>>>> Kudu Client API to do so rather than the traditional
KuduContext API? I am
>>>>>>>>> stuck at a point and couldn't find a solution.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Ravi
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message