flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Execute multiple jobs in parallel (threading): java.io.OptionalDataException
Date Fri, 27 Oct 2017 13:07:02 GMT
Hi David,

I cannot exactly tell how you ended up seeing an OptionalDataException
without seeing your code.

Flink supports to run multiple jobs on the same cluster. That’s what we
call the session mode.

You should not reuse the ExecutionEnvironment because then, you will create
a single job which simply consists of multiple disjunct parts. Calling
ExecutionEnvironment.getExecutionEnvironment will give you a fresh
ExecutionEnvrionment which you can use to submit a new job. Note that you
have to call env.execute in a separate thread because it is a blocking
operation.

Cheers,
Till
​

On Thu, Oct 26, 2017 at 10:22 PM, David Dreyfus <dddreyfus@gmail.com> wrote:

> Hello,
>
> I am trying to submit multiple jobs to flink from within my Java program.
> I am running into an exception that may be related:
> java.io.OptionalDataException.
>
> Should I be able to create multiple plans/jobs within a single session and
> execute them concurrently?
> If so, is there a working example you can point me at?
>
> Do I share the same ExecutionEnvironment?
> It looks like calls to getExecutionEnvironment() return the same one.
>
> I have a number of different transformations on my data I'd like to make.
> I'd rather not create one very large job and have them processed in
> parallel.
> My cluster has enough resources that performing each job sequentially would
> be very wasteful.
>
> Thank you,
> David
>
> Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
> java.io.OptionalDataException
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1588)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at java.util.HashMap.readObject(HashMap.java:1407)
>         at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeReadObject(
> ObjectStreamClass.java:1158)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2173)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:290)
>         at
> org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$
> jobmanager$JobManager$$submitJob(JobManager.scala:1283)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.
> applyOrElse(JobManager.scala:495)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:
> 36)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:
> 36)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.
> scala:123)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.
> scala:125)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Mime
View raw message