flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Objects deserialization on Jobmanager
Date Mon, 27 Apr 2015 08:12:21 GMT
Hi Ventura!

I hope you can get along without editing the Flink runtime, with the help
of overriding the open/close methods of the RichFunctions and do your
initialization there.
They are called once per life of a parallel function (except in iterations,
where they are called once per superstep, but you can always check whether
you are in the first superstep)

For the threads: each parallel instance of a function has one thread,
unless they are chained (you see this in the log and WebUI for example by
the operator name "Chain Source -> Map -> Combiner". If parallel functions
share a slot, they still get a dedicated thread each, they only divide the
Flink-managed memory among themselves. Only the sorter as additional
threads for asynchronous sorting / spilling, but those execute no user code.

Would it work if your data types (that are exchanged between functions) do
not carry and CUDE resource related data types, but only the arrays? Then,
inside your functions, you create the cuda types, and emit again only the
data arrays?

Greetings,
Stephan


On Fri, Apr 24, 2015 at 3:54 PM, Ventura Del Monte <
venturadelmonte@gmail.com> wrote:

> Hi Stephan!
>
> Thank you for your reply, first of all! You're right about how I
> distributed my data. I need this because I have an object that should be
> shared among tasks. I am working on decoupling this object from the cuda
> type at the moment and I will follow your suggestions!
>
> About my CudaExecutor, it's a worker thread binded to a gpu cuda context
> and it acts following the multi producer - single consumer pattern,
> initialized in RichMapFunction.open method and on client startup, it was
> not supposed to run on jobmaner, i did not expect that would happen, to be
> honest. But I think I need to redesign my software architecture, because
> the cuda worker could be like a  bottleneck with higher level of
> parallelism. Moreover the whole system will handle many context
> creations/distructions in open/close methods. I was thinking of editing
> flink-runtime, in order to make it aware of gpu resources: when taskmanager
> spawns a new thread, this should initialize a cuda context binded to one of
> the gpu of the underlying hardware. I think this can be easily done in
> RuntimeEnvironment and in instance.* classes (plus adding more
> configuration options). That would allow me to execute my dl library on
> heterogeneous multi-gpu clusters. I think it should work and I would like
> to know your opinion about that if you do not mind. Yet I have a doubt,
> will flink use the same thread to process tasks which are in the same slot?
> Thanks in advance.
>
> Regards,
> Ventura
>
>
> 2015-04-24 10:26 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>
>> Hi Ventura!
>>
>> You are distributing your data via something like "env.fromElements(...)"
>> or "env.fromCollection(...)", is that correct?
>>
>> The master node (JobManager) currently takes each InputFormat and checks
>> whether it needs some "master side initialization". For file input formats,
>> this computes for example the different splits of the file(s) that the
>> parallel tasks will read.
>> For inputs like "env.fromElements(...)" or "env.fromCollection(...)",
>> this is redundant, since there is no need to coordinate anything, it is
>> just that this initialization check happens for all inputs. It is a good
>> idea to skip that for collection inputs.
>>
>> If you want to avoid that this happens on the JobManager, the simplest
>> way would be to make the data source independent of the Cuda types.
>>
>>   - Define the source as tuple2 with the row and column dimensions.
>>
>>     DataSet<Tuple2<Integer, Integer>> source = env.fromElements(new
>> Tuple2<>(...), new Tuple2<>(...));
>>
>>   - Transform the tuples into your Cuda types. Also, since that source is
>> not parallel (java/scala collections are always run with
>>     parallelism 1), make sure you tell the system to go parallel after
>> that:
>>
>>     DataSet<GpuDataRegion> data = source.map( (tuple) -> { /* your code
>> for inirialization } ).parallelism(64);
>>    // the last statement makes sure the mapper runs with 64 parallel
>> instances
>>
>>
>> Out of curiosity: The deserialization bug occurs here on the JobManager
>> (because the JobManager looks into the Inputs), but I assume it would also
>> occur on the TaskManagers (workers) once the proper execution starts?
>> How is Core.CudaExecutor usually initialized, so that it is not null
>> when you need it?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Apr 22, 2015 at 5:50 PM, Ventura Del Monte <
>> venturadelmonte@gmail.com> wrote:
>>
>>> I am using Flink 0.9-SNAPSHOT, this is the complete stack trace:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Cannot initialize task 'DataSource (at
>>> <init>(DownpourSDG.java:28)
>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>>> InputFormat
>>> ([org.dl4flink.dl.neuralnets.models.autoencoder.AutoEncoderParam@352c308])
>>> failed: unread block data
>>> at org.apache.flink.client.program.Client.run(Client.java:378)2015-04-22
>>> 17:14:18 INFO  DL4Flink:158 - Elapsed: 3
>>>
>>> at org.apache.flink.client.program.Client.run(Client.java:314)
>>> at org.apache.flink.client.program.Client.run(Client.java:307)
>>> at
>>> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
>>> at
>>> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
>>> at
>>> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
>>> at org.dl4flink.DL4Flink.RunFlinkJob(DL4Flink.java:295)
>>> at org.dl4flink.DL4Flink.main(DL4Flink.java:56)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>>> initialize task 'DataSource (at <init>(DownpourSDG.java:28)
>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
>>> InputFormat
>>> ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308])
>>> failed: unread block data
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:527)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:511)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:511)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:197)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:44)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.Exception: Deserializing the InputFormat
>>> ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308])
>>> failed: unread block data
>>> at
>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:524)
>>> ... 25 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>>> at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>>> at
>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>>> at
>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>>> ... 26 more
>>>
>>>
>>> I checked the jobmanager log and I know that the objected needed for the
>>> deserialization is null.
>>>
>>>
>>> public void kryoDeserialize(Kryo kryo, Input in)
>>> {
>>>     this.rows = in.readInt();
>>>   this.cols = in.readInt();
>>>     this.size = this.rows * this.cols;
>>>     double[] tmp = in.readDoubles(this.size);
>>>     Core.CudaExecutor.invoke((handle) -> cuMemAlloc(this.deviceData,
>>> this.size * Sizeof.DOUBLE)); // here CudaExecutor is null on JobManager
>>> }
>>>
>>> As you can see, deviceData is my transient field I need to store/read in
>>> a specific way, since it is a pointer to gpu memory.
>>>
>>> The object I need to deserialize is part of a broadcast set. I think
>>> this is the reason why the jobmanager needs to read it (i figured it out
>>> after I sent my first mail).
>>> I am thinking over whether I should edit my code in order to get rid of
>>> this situation, since having the jobmanager allocating could be a drawback.
>>> What do you think about that?
>>>
>>> Thank you for your time!
>>>
>>>
>>> 2015-04-22 16:48 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>
>>>> The corresponding code snippet could also help.
>>>>
>>>> Cheers,
>>>>
>>>> Till
>>>>
>>>> On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> which version of Flink are you using?
>>>>>
>>>>> Can you send us the complete stack trace of the error to help us
>>>>> understand the exact location where the issue occurs?
>>>>>
>>>>> On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte <
>>>>> venturadelmonte@gmail.com> wrote:
>>>>>
>>>>>> Hello, I am working on a flink-based deep learning library for my
>>>>>> master's thesis. I am experiencing this issue at the moment: I have
a java
>>>>>> class with a transient field, so I had to write both a kryo custom
>>>>>> serializer and a java one. The (de)serialization needs to access
another
>>>>>> object of my system, so if I run my software locally it works fine
because
>>>>>> the needed object is instantiated meanwhile it crashes when I run
it in a
>>>>>> remote environment because when the jobmanager receives the data,
the
>>>>>> object needed for the deserialization is not present in the system.
Thus,
>>>>>> my question is whether it is possible to let the jobmanager execute
some
>>>>>> user code or would it be better to edit the architecture of my system
in
>>>>>> order to avoid this kind of problem?
>>>>>>
>>>>>> Regards,
>>>>>> Ventura
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message