kudu-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Percy <mpe...@apache.org>
Subject Re: Spark Streaming + Kudu
Date Wed, 07 Mar 2018 04:42:06 GMT
Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.
gitbooks.io/mastering-apache-spark/content/spark-local.html

Mike

On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:

> Mike,
>
> Can you clarify a bit on grabbing the jstack for the process? I launched
> my Spark application and tried to get the pid using which I thought I can
> grab jstack trace during hang. Unfortunately, I am not able to figure out
> grabbing pid for Spark application.
>
> Thanks,
> Ravi
>
> On 6 March 2018 at 18:36, Mike Percy <mpercy@apache.org> wrote:
>
>> Thanks Ravi. Would you mind attaching the output of jstack on the process
>> during this hang? That would show what the Kudu client threads are doing,
>> as what we are seeing here is just the netty boss thread.
>>
>> Mike
>>
>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth <ravikanth.4b0@gmail.com>
>> wrote:
>>
>>>
>>> Yes, I have debugged to find the root cause. Every logger before "table
>>> = client.openTable(tableName);" is executing fine and exactly at the
>>> point of opening the table, it is throwing the below exception and nothing
>>> is being executed after that. Still the Spark batches are being processed
>>> and at opening the table is failing. I tried catching it with no luck.
>>> Please find below the exception.
>>>
>>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
>>> on [id: 0x6e13b01f]
>>> java.net.ConnectException: Connection refused:
>>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>> .java:717)
>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 5 March 2018 at 23:52, Mike Percy <mpercy@apache.org> wrote:
>>>
>>>> Have you considered checking your session error count or pending errors
>>>> in your while loop every so often? Can you identify where your code is
>>>> hanging when the connection is lost (what line)?
>>>>
>>>> Mike
>>>>
>>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>> wrote:
>>>>
>>>>> In addition to my previous comment, I raised a support ticket for this
>>>>> issue with Cloudera and one of the support person mentioned below,
>>>>>
>>>>> *"Thank you for clarifying, The exceptions are logged but not
>>>>> re-thrown to an upper layer, so that explains why the Spark application
is
>>>>> not aware of the underlying error."*
>>>>>
>>>>> On 5 March 2018 at 21:02, Ravi Kanth <ravikanth.4b0@gmail.com>
wrote:
>>>>>
>>>>>> Mike,
>>>>>>
>>>>>> Thanks for the information. But, once the connection to any of the
>>>>>> Kudu servers is lost then there is no way I can have a control on
the
>>>>>> KuduSession object and so with getPendingErrors(). The KuduClient
in this
>>>>>> case is becoming a zombie and never returned back till the connection
is
>>>>>> properly established. I tried doing all that you have suggested with
no
>>>>>> luck. Attaching my KuduClient code.
>>>>>>
>>>>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>>>>
>>>>>> import java.util.HashMap;
>>>>>> import java.util.Iterator;
>>>>>> import java.util.Map;
>>>>>> import org.apache.hadoop.util.ShutdownHookManager;
>>>>>> import org.apache.kudu.client.*;
>>>>>> import org.apache.spark.api.java.JavaRDD;
>>>>>> import org.slf4j.Logger;
>>>>>> import org.slf4j.LoggerFactory;
>>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>>>>> ullConstants;
>>>>>>
>>>>>> public class KuduProcess {
>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>>>>> ocess.class);
>>>>>> private KuduTable table;
>>>>>> private KuduSession session;
>>>>>>
>>>>>> public static void upsertKudu(JavaRDD<Map<String, Object>>
rdd,
>>>>>> String host, String tableName) {
>>>>>> rdd.foreachPartition(iterator -> {
>>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>>>>> tableName, host);
>>>>>> int errorCount = errors.getRowErrors().length;
>>>>>> if(errorCount > 0){
>>>>>> throw new RuntimeException("Failed to write " + errorCount + "
>>>>>> messages into Kudu");
>>>>>> }
>>>>>> });
>>>>>> }
>>>>>> private static RowErrorsAndOverflowStatus
>>>>>> upsertOpIterator(Iterator<Map<String, Object>> iter,
String
>>>>>> tableName, String host) {
>>>>>> try {
>>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>>>>> KuduClient client = asyncClient.syncClient();
>>>>>> table = client.openTable(tableName);
>>>>>> session = client.newSession();
>>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>>>>> SH_BACKGROUND);
>>>>>> while (iter.hasNext()) {
>>>>>> upsertOp(iter.next());
>>>>>> }
>>>>>> } catch (KuduException e) {
>>>>>> logger.error("Exception in upsertOpIterator method", e);
>>>>>> }
>>>>>> finally{
>>>>>> try {
>>>>>> session.close();
>>>>>> } catch (KuduException e) {
>>>>>> logger.error("Exception in Connection close", e);
>>>>>> }
>>>>>> }
>>>>>> return session.getPendingErrors();        --------------------->
>>>>>> Once, the connection is lost, this part of the code never gets called
and
>>>>>> the Spark job will keep on running and processing the records while
>>>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing
all
>>>>>> the records.
>>>>>> }
>>>>>> public static void upsertOp(Map<String, Object> formattedMap)
{
>>>>>> if (formattedMap.size() != 0) {
>>>>>> try {
>>>>>> Upsert upsert = table.newUpsert();
>>>>>> PartialRow row = upsert.getRow();
>>>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet())
{
>>>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>>>>> row.setNull(entry.getKey());
>>>>>> else
>>>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>>>> row.setNull(entry.getKey());
>>>>>> else
>>>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>>>> row.setNull(entry.getKey());
>>>>>> else
>>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> session.apply(upsert);
>>>>>> } catch (Exception e) {
>>>>>> logger.error("Exception during upsert:", e);
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>> class KuduConnection {
>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>>>>> nnection.class);
>>>>>> private static Map<String, AsyncKuduClient> asyncCache = new
>>>>>> HashMap<>();
>>>>>> private static int ShutdownHookPriority = 100;
>>>>>>
>>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>>>>>> if (!asyncCache.containsKey(kuduMaster)) {
>>>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>>>>>> tBuilder(kuduMaster).build();
>>>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>>>>>> @Override
>>>>>> public void run() {
>>>>>> try {
>>>>>> asyncClient.close();
>>>>>> } catch (Exception e) {
>>>>>> logger.error("Exception closing async client", e);
>>>>>> }
>>>>>> }
>>>>>> }, ShutdownHookPriority);
>>>>>> asyncCache.put(kuduMaster, asyncClient);
>>>>>> }
>>>>>> return asyncCache.get(kuduMaster);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Ravi
>>>>>>
>>>>>> On 5 March 2018 at 16:20, Mike Percy <mpercy@apache.org> wrote:
>>>>>>
>>>>>>> Hi Ravi, it would be helpful if you could attach what you are
>>>>>>> getting back from getPendingErrors() -- perhaps from dumping
>>>>>>> RowError.toString() from items in the returned array -- and indicate
what
>>>>>>> you were hoping to get back. Note that a RowError can also return
to you
>>>>>>> the Operation
>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>>>>>> that you used to generate the write. From the Operation, you
can get the
>>>>>>> original PartialRow
>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>>>>>> object, which should be able to identify the affected row that
the write
>>>>>>> failed for. Does that help?
>>>>>>>
>>>>>>> Since you are using the Kudu client directly, Spark is not involved
>>>>>>> from the Kudu perspective, so you will need to deal with Spark
on your own
>>>>>>> in that case.
>>>>>>>
>>>>>>> Mike
>>>>>>>
>>>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Mike,
>>>>>>>>
>>>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>>>>>>
>>>>>>>> So, I am trying to use Kudu Client API to perform UPSERT
into Kudu
>>>>>>>> and I integrated this with Spark. I am trying to test a case
where in if
>>>>>>>> any of Kudu server fails. So, in this case, if there is any
problem in
>>>>>>>> writing, getPendingErrors() should give me a way to handle
these errors so
>>>>>>>> that I can successfully terminate my Spark Job. This is what
I am trying to
>>>>>>>> do.
>>>>>>>>
>>>>>>>> But, I am not able to get a hold of the exceptions being
thrown
>>>>>>>> from with in the KuduClient when retrying to connect to Tablet
Server. My
>>>>>>>> getPendingErrors is not getting ahold of these exceptions.
>>>>>>>>
>>>>>>>> Let me know if you need more clarification. I can post some
>>>>>>>> Snippets.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Ravi
>>>>>>>>
>>>>>>>> On 5 March 2018 at 13:18, Mike Percy <mpercy@apache.org>
wrote:
>>>>>>>>
>>>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>>>>>>> You mention that you are trying to use getPendingErrors()
>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
but
>>>>>>>>> it sounds like it's not working for you -- can you be
more specific about
>>>>>>>>> what you expect and what you are observing?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Mike
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <
>>>>>>>>> ravikanth.4b0@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thank Clifford. We are running Kudu 1.4 version.
Till date we
>>>>>>>>>> didn't see any issues in production and we are not
losing tablet servers.
>>>>>>>>>> But, as part of testing I have to generate few unforeseen
cases to analyse
>>>>>>>>>> the application performance. One among that is bringing
down the tablet
>>>>>>>>>> server or master server intentionally during which
I observed the loss of
>>>>>>>>>> records. Just wanted to test cases out of the happy
path here. Once again
>>>>>>>>>> thanks for taking time to respond to me.
>>>>>>>>>>
>>>>>>>>>> - Ravi
>>>>>>>>>>
>>>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick <
>>>>>>>>>> cresnick@mediamath.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'll have to get back to you on the code bits,
but I'm pretty
>>>>>>>>>>> sure we're doing simple sync batching. We're
not in production yet, but
>>>>>>>>>>> after some months of development I haven't seen
any failures, even when
>>>>>>>>>>> pushing load doing multiple years' backfill.
I think the real question is
>>>>>>>>>>> why are you losing tablet servers? The only instability
we ever had with
>>>>>>>>>>> Kudu was when it had that weird ntp sync issue
that was fixed I think for
>>>>>>>>>>> 1.6. What version are you running?
>>>>>>>>>>>
>>>>>>>>>>> Anyway I would think that infinite loop should
be catchable
>>>>>>>>>>> somewhere. Our pipeline is set to fail/retry
with Flink snapshots. I
>>>>>>>>>>> imagine there is similar with Spark. Sorry I
cant be of more help!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Cliff,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the response. Well, I do agree that
its simple and
>>>>>>>>>>> seamless. In my case, I am able to upsert ~25000
events/sec into Kudu. But,
>>>>>>>>>>> I am facing the problem when any of the Kudu
Tablet or master server is
>>>>>>>>>>> down. I am not able to get a hold of the exception
from client. The client
>>>>>>>>>>> is going into an infinite loop trying to connect
to Kudu. Meanwhile, I am
>>>>>>>>>>> loosing my records. I tried handling the errors
through getPendingErrors()
>>>>>>>>>>> but still it is helpless. I am using AsyncKuduClient
to establish the
>>>>>>>>>>> connection and retrieving the syncClient from
the Async to open the session
>>>>>>>>>>> and table. Any help?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Ravi
>>>>>>>>>>>
>>>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cresny@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> While I can't speak for Spark, we do use the
client API from
>>>>>>>>>>> Flink streaming and it's simple and seamless.
It's especially nice if you
>>>>>>>>>>> require an Upsert semantic.
>>>>>>>>>>>
>>>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth.4b0@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Anyone using Spark Streaming to ingest data into
Kudu and using
>>>>>>>>>>> Kudu Client API to do so rather than the traditional
KuduContext API? I am
>>>>>>>>>>> stuck at a point and couldn't find a solution.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Ravi
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message