spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugen Cepoi <cepoi.eu...@gmail.com>
Subject Re: Spark 1.5 Streaming and Kinesis
Date Thu, 15 Oct 2015 09:31:55 GMT
So running it using spark-submit doesnt change anything, it still works.

When reading the code
https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
it looks like the receivers are definitely being ser/de. I think this is
the issue, need to find a way to confirm that now...

2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eugen@gmail.com>:

> Hey,
>
> A quick update on other things that have been tested.
>
> When looking at the compiled code of the spark-streaming-kinesis-asl jar
> everything looks normal (there is a class that implements SyncMap and it is
> used inside the receiver).
> Starting a spark shell and using introspection to instantiate a receiver
> and check that blockIdToSeqNumRanges implements SyncMap works too. So
> obviously it has the correct type according to that.
>
> Another thing to test could be to do the same introspection stuff but
> inside a spark job to make sure it is not a problem in the way the jobs are
> run.
> The other idea would be that this is a problem related to ser/de. For
> example if the receiver was being serialized and then deserialized it could
> definitely happen depending on the lib used and its configuration that it
> just doesn't preserve the concrete type. So it would deserialize using the
> compile type instead of the runtime type.
>
> Cheers,
> Eugen
>
>
> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <jb@nanthrax.net>:
>
>> Thanks for the update Phil.
>>
>> I'm preparing a environment to reproduce it.
>>
>> I keep you posted.
>>
>> Thanks again,
>> Regards
>> JB
>>
>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>>> Not a dumb question, but yes I updated all of the library references to
>>> 1.5, including  (even tried 1.5.1).
>>>
>>> // Versions.spark set elsewhere to "1.5.0"
>>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>>> "provided"
>>>
>>> I am experiencing the issue in my own spark project, but also when I try
>>> to run the spark streaming kinesis example that comes in spark/examples
>>>
>>> Tried running the streaming job locally, and also in EMR with release
>>> 4.1.0 that includes Spark 1.5
>>>
>>> Very strange!
>>>
>>>     ---------- Forwarded message ----------
>>>
>>>     From: "Jean-Baptiste Onofré" <jb@nanthrax.net <mailto:
>>> jb@nanthrax.net>>
>>>     To: user@spark.apache.org <mailto:user@spark.apache.org>
>>>
>>>     Cc:
>>>     Date: Thu, 15 Oct 2015 08:03:55 +0200
>>>     Subject: Re: Spark 1.5 Streaming and Kinesis
>>>     Hi Phil,
>>>     KinesisReceiver is part of extra. Just a dumb question: did you
>>>     update all, including the Spark Kinesis extra containing the
>>>     KinesisReceiver ?
>>>     I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>>> see:
>>>     blockIdToSeqNumRanges.clear()
>>>     which is a:
>>>     private val blockIdToSeqNumRanges = new
>>>     mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>>          with mutable.SynchronizedMap[StreamBlockId,
>>> SequenceNumberRanges]
>>>     So, it doesn't look fully correct to me.
>>>     Let me investigate a bit this morning.
>>>     Regards
>>>     JB
>>>     On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>>     We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>>>     streaming applications, to take advantage of the new Kinesis
>>>     checkpointing improvements in 1.5.
>>>     However after upgrading, we are consistently seeing the following
>>> error:
>>>     java.lang.ClassCastException: scala.collection.mutable.HashMap
>>> cannot be
>>>     cast to scala.collection.mutable.SynchronizedMap
>>>     at
>>>
>>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>>     at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>>     at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>>     at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>>     at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>>     at
>>>
>>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>>     at
>>>
>>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>     at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>     I even get this when running the Kinesis examples :
>>>
>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>> with
>>>     bin/run-example streaming.KinesisWordCountASL
>>>     Am I doing something incorrect?
>>>
>>>
>>>     --
>>>     Jean-Baptiste Onofré
>>>     jbonofre@apache.org <mailto:jbonofre@apache.org>
>>>     http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>>     Talend - http://www.talend.com <http://www.talend.com/>
>>>
>>>     Hi,
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Mime
View raw message