spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugen Cepoi <cepoi.eu...@gmail.com>
Subject Re: Spark 1.5 Streaming and Kinesis
Date Thu, 15 Oct 2015 09:12:32 GMT
Hey,

A quick update on other things that have been tested.

When looking at the compiled code of the spark-streaming-kinesis-asl jar
everything looks normal (there is a class that implements SyncMap and it is
used inside the receiver).
Starting a spark shell and using introspection to instantiate a receiver
and check that blockIdToSeqNumRanges implements SyncMap works too. So
obviously it has the correct type according to that.

Another thing to test could be to do the same introspection stuff but
inside a spark job to make sure it is not a problem in the way the jobs are
run.
The other idea would be that this is a problem related to ser/de. For
example if the receiver was being serialized and then deserialized it could
definitely happen depending on the lib used and its configuration that it
just doesn't preserve the concrete type. So it would deserialize using the
compile type instead of the runtime type.

Cheers,
Eugen


2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <jb@nanthrax.net>:

> Thanks for the update Phil.
>
> I'm preparing a environment to reproduce it.
>
> I keep you posted.
>
> Thanks again,
> Regards
> JB
>
> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>
>> Not a dumb question, but yes I updated all of the library references to
>> 1.5, including  (even tried 1.5.1).
>>
>> // Versions.spark set elsewhere to "1.5.0"
>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>> "provided"
>>
>> I am experiencing the issue in my own spark project, but also when I try
>> to run the spark streaming kinesis example that comes in spark/examples
>>
>> Tried running the streaming job locally, and also in EMR with release
>> 4.1.0 that includes Spark 1.5
>>
>> Very strange!
>>
>>     ---------- Forwarded message ----------
>>
>>     From: "Jean-Baptiste Onofré" <jb@nanthrax.net <mailto:jb@nanthrax.net
>> >>
>>     To: user@spark.apache.org <mailto:user@spark.apache.org>
>>
>>     Cc:
>>     Date: Thu, 15 Oct 2015 08:03:55 +0200
>>     Subject: Re: Spark 1.5 Streaming and Kinesis
>>     Hi Phil,
>>     KinesisReceiver is part of extra. Just a dumb question: did you
>>     update all, including the Spark Kinesis extra containing the
>>     KinesisReceiver ?
>>     I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>> see:
>>     blockIdToSeqNumRanges.clear()
>>     which is a:
>>     private val blockIdToSeqNumRanges = new
>>     mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>          with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
>>     So, it doesn't look fully correct to me.
>>     Let me investigate a bit this morning.
>>     Regards
>>     JB
>>     On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>     We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>>     streaming applications, to take advantage of the new Kinesis
>>     checkpointing improvements in 1.5.
>>     However after upgrading, we are consistently seeing the following
>> error:
>>     java.lang.ClassCastException: scala.collection.mutable.HashMap cannot
>> be
>>     cast to scala.collection.mutable.SynchronizedMap
>>     at
>>
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>     at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>     at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>     at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>     at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>     at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>     at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>     at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:745)
>>     I even get this when running the Kinesis examples :
>>
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>> with
>>     bin/run-example streaming.KinesisWordCountASL
>>     Am I doing something incorrect?
>>
>>
>>     --
>>     Jean-Baptiste Onofré
>>     jbonofre@apache.org <mailto:jbonofre@apache.org>
>>     http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>     Talend - http://www.talend.com <http://www.talend.com/>
>>
>>     Hi,
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message