flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jack <jack-kn...@marmelandia.com>
Subject Re: Immutable data
Date Wed, 23 Sep 2015 12:10:14 GMT
Hi Stephan!

Here's the trace (flink 0.9.1 + scala 2.10.5)

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Cannot instantiate StreamRecord.
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:63)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:66)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Cannot instantiate class.
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:225)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:60)
... 4 more
Caused by: java.lang.IllegalArgumentException: Can not set int field org.myorg.quickstart.Test$X.id
to org.myorg.quickstart.Test$Id
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
at sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:98)
at java.lang.reflect.Field.set(Field.java:764)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:232)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:221)
... 5 more

> On 23 Sep 2015, at 14:37, Stephan Ewen <sewen@apache.org> wrote:
> 
> Hi Jack!
> 
> This should be supported, there is no strict requirement for mutable types.
> 
> The POJO rules apply only if you want to use the "by-field-name" addressing for keys.
In Scala, you should be able to use case classes as well, even if they are immutable.
> 
> Can you post the exception that you get?
> 
> Greetings,
> Stephan
> 
> 
>> On Wed, Sep 23, 2015 at 1:29 PM, Jack <jack-knilf@marmelandia.com> wrote:
>> Hi,
>> 
>> I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement.
>> 
>> We're using AnyVal heavily for type safety, and immutable classes as a default. For
example, the following does not work:
>> 
>> object Test {
>>   class Id(val underlying: Int) extends AnyVal
>> 
>>   class X(var id: Id) {
>>     def this() { this(new Id(0)) }
>>   }
>> 
>>   class MySource extends SourceFunction[X] {
>>     def run(ctx: SourceFunction.SourceContext[X]) {
>>       ctx.collect(new X(new Id(1)))
>>     }
>>     def cancel() {}
>>   }
>> 
>>   def main(args: Array[String]) {
>>     val env = StreamExecutionContext.getExecutionContext
>>     env.addSource(new MySource).print
>>     env.execute("Test")
>>   }
>> }
>> 
>> Currently I'm thinking that I would need to have duplicate classes and code for Flint
and for non-Flint code, or somehow use immutable interfaces for non-Flint code. Both ways
are expensive in terms of development time.
>> 
>> Would you have any guidance on how to integrate Flink with a code base that has immutability
as a norm?
>> 
>> Thanks
> 

Mime
View raw message