Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4A39710884 for ; Wed, 11 Feb 2015 10:40:10 +0000 (UTC) Received: (qmail 46795 invoked by uid 500); 11 Feb 2015 10:40:10 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 46735 invoked by uid 500); 11 Feb 2015 10:40:10 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 46723 invoked by uid 99); 11 Feb 2015 10:40:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2015 10:40:09 +0000 X-ASF-Spam-Status: No, hits=2.6 required=5.0 tests=HTML_MESSAGE,NORMAL_HTTP_TO_IP,RCVD_IN_DNSWL_LOW,SPF_PASS,TRACKER_ID,WEIRD_PORT X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of till.rohrmann@gmail.com designates 209.85.192.49 as permitted sender) Received: from [209.85.192.49] (HELO mail-qg0-f49.google.com) (209.85.192.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2015 10:40:05 +0000 Received: by mail-qg0-f49.google.com with SMTP id q107so1874074qgd.8 for ; Wed, 11 Feb 2015 02:39:45 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=MwwNyVMwoTCXh5CXZrTIxaGFp58x65TzP96ITNvmugU=; b=lBdx55kZIdtW7sgxGe2RzSCOFGI27ZqWIKlY8dRJZwXGyxu5HiTwC15kiIkaovNQHf Sk2KE6Bef0h9Ws+4NoUEJvuT9F1eGvNGQ3YAv694YM9tqAZpjKBtpSB++EQM2Lvgkn/r 5wUGBXL+i2hZ5aUrEQaOJhS1nCBRfYHZ+TpzHbcSvHfksLSocneh/cEmtI1MnjgwYwuG YVJHG/itbEh7UiDQf8TjZmCv1eiTYkB1bXg4TbPuwa+y2vLDBPpbOG/1SN40Ej7eyGAm JLiO1Uf7kGLhAn53mIRS5vA1AUem89RV19DdHGh5yBFBc4/pUuZ5TUY4pA3IM+HMn6pa JkEg== MIME-Version: 1.0 X-Received: by 10.224.87.138 with SMTP id w10mr63443684qal.79.1423651185037; Wed, 11 Feb 2015 02:39:45 -0800 (PST) Received: by 10.229.189.135 with HTTP; Wed, 11 Feb 2015 02:39:44 -0800 (PST) In-Reply-To: References: <54D22379.7030108@fu-berlin.de> <54DB2ABA.7090907@fu-berlin.de> Date: Wed, 11 Feb 2015 11:39:44 +0100 Message-ID: Subject: Re: Cluster execution - Jobmanager unreachable From: Till Rohrmann To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c3c12e2b1800050ecd9fe6 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c3c12e2b1800050ecd9fe6 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I found the error. Due to some refactoring, a wrong message was sent to the JobManager in the JobManagerInfoServlet.java. I pushed a fix. Could you try it out again? On Wed, Feb 11, 2015 at 11:34 AM, Till Rohrmann wrote: > Could you check the rebasing because it seems as if the web server is now > sending RequestArchivedJobs messages to the JobManager which should not > happen. These messages should go directly to the MemoryArchivist. The > corresponding file is JobManagerInfoServlet.java, I think. > > On Wed, Feb 11, 2015 at 11:11 AM, Chesnay Schepler < > chesnay.schepler@fu-berlin.de> wrote: > >> I just tried Till's fix, rebased to the latest master and got a whole lo= t >> of these exceptions right away: >> >> java.lang.Exception: The slot in which the task was scheduled has been >> killed (probably loss of TaskManager). >> at org.apache.flink.runtime.instance.SimpleSlot.cancel( >> SimpleSlot.java:98) >> at org.apache.flink.runtime.jobmanager.scheduler. >> SlotSharingGroupAssignment.releaseSimpleSlot(SlotSharingGroupAssignment. >> java:320) >> at org.apache.flink.runtime.jobmanager.scheduler. >> SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment. >> java:304) >> at org.apache.flink.runtime.instance.SharedSlot. >> releaseSlot(SharedSlot.java:106) >> at org.apache.flink.runtime.instance.Instance.markDead( >> Instance.java:148) >> at org.apache.flink.runtime.instance.InstanceManager. >> shutdown(InstanceManager.java:111) >> at org.apache.flink.runtime.jobmanager.JobManager. >> postStop(JobManager.scala:132) >> at org.apache.flink.runtime.jobmanager.JobManager$$ >> anonfun$main$1$$anon$1.org$apache$flink$runtime$ >> jobmanager$WithWebServer$$super$postStop(JobManager.scala:559) >> at org.apache.flink.runtime.jobmanager.WithWebServer$ >> class.postStop(WithWebServer.scala:38) >> at org.apache.flink.runtime.jobmanager.JobManager$$ >> anonfun$main$1$$anon$1.postStop(JobManager.scala:559) >> at akka.actor.Actor$class.preRestart(Actor.scala:533) >> at org.apache.flink.runtime.jobmanager.JobManager. >> preRestart(JobManager.scala:80) >> at akka.actor.Actor$class.aroundPreRestart(Actor.scala:480) >> at org.apache.flink.runtime.jobmanager.JobManager. >> aroundPreRestart(JobManager.scala:80) >> at akka.actor.dungeon.FaultHandling$class. >> faultRecreate(FaultHandling.scala:67) >> at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369) >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459) >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec( >> ForkJoinTask.java:260) >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >> runTask(ForkJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >> ForkJoinPool.java:1979) >> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >> ForkJoinWorkerThread.java:107) >> >> the following is an exempt from the jobmanager log: >> >> 10:47:13,567 ERROR akka.actor.OneForOneStrategy >> - Received unknown message RequestArchivedJobs >> java.lang.RuntimeException: Received unknown message RequestArchivedJobs >> at org.apache.flink.runtime.jobmanager.JobManager. >> unhandled(JobManager.scala:510) >> at akka.actor.Actor$$anonfun$aroundReceive$1.apply(Actor.scala:465) >> at akka.actor.Actor$$anonfun$aroundReceive$1.apply(Actor.scala:465) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118= ) >> at org.apache.flink.runtime.ActorLogMessages$$anon$1. >> applyOrElse(ActorLogMessages.scala:30) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at org.apache.flink.runtime.jobmanager.JobManager. >> aroundReceive(JobManager.scala:80) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec( >> ForkJoinTask.java:260) >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >> runTask(ForkJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >> ForkJoinPool.java:1979) >> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >> ForkJoinWorkerThread.java:107) >> 10:47:13,569 INFO org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$main$1$$anon$1 >> - Stopping webserver. >> 10:47:13,620 WARN org.eclipse.jetty.util.log >> - /jobsInfo >> org.eclipse.jetty.io.RuntimeIOException: org.eclipse.jetty.io. >> EofException >> at org.eclipse.jetty.io.UncheckedPrintWriter.setError( >> UncheckedPrintWriter.java:107) >> at org.eclipse.jetty.io.UncheckedPrintWriter.write( >> UncheckedPrintWriter.java:280) >> at org.eclipse.jetty.io.UncheckedPrintWriter.write( >> UncheckedPrintWriter.java:295) >> at org.eclipse.jetty.io.UncheckedPrintWriter.print( >> UncheckedPrintWriter.java:460) >> at org.apache.flink.runtime.jobmanager.web. >> JobManagerInfoServlet.doGet(JobManagerInfoServlet.java:158) >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:847) >> at org.eclipse.jetty.servlet.ServletHolder.handle( >> ServletHolder.java:532) >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( >> ServletHandler.java:453) >> at org.eclipse.jetty.server.session.SessionHandler. >> doHandle(SessionHandler.java:227) >> at org.eclipse.jetty.server.handler.ContextHandler. >> doHandle(ContextHandler.java:965) >> at org.eclipse.jetty.servlet.ServletHandler.doScope( >> ServletHandler.java:388) >> at org.eclipse.jetty.server.session.SessionHandler. >> doScope(SessionHandler.java:187) >> at org.eclipse.jetty.server.handler.ContextHandler. >> doScope(ContextHandler.java:901) >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >> ScopedHandler.java:117) >> at org.eclipse.jetty.server.handler.HandlerList.handle( >> HandlerList.java:47) >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:113) >> at org.eclipse.jetty.server.Server.handle(Server.java:352) >> at org.eclipse.jetty.server.HttpConnection.handleRequest( >> HttpConnection.java:596) >> at org.eclipse.jetty.server.HttpConnection$RequestHandler. >> headerComplete(HttpConnection.java:1048) >> at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:549) >> at org.eclipse.jetty.http.HttpParser.parseAvailable( >> HttpParser.java:211) >> at org.eclipse.jetty.server.HttpConnection.handle( >> HttpConnection.java:425) >> at org.eclipse.jetty.io.nio.SelectChannelEndPoint.run( >> SelectChannelEndPoint.java:489) >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( >> QueuedThreadPool.java:436) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: org.eclipse.jetty.io.EofException >> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:142) >> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:86) >> at java.io.ByteArrayOutputStream.writeTo(ByteArrayOutputStream. >> java:167) >> at org.eclipse.jetty.server.HttpWriter.write(HttpWriter.java:258) >> at org.eclipse.jetty.server.HttpWriter.write(HttpWriter.java:107) >> at org.eclipse.jetty.io.UncheckedPrintWriter.write( >> UncheckedPrintWriter.java:271) >> ... 24 more >> 10:47:13,623 INFO org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$main$1$$anon$1 >> - Stopped webserver. >> 10:47:13,624 INFO org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$main$1$$anon$1 >> - Stopping job manager akka://flink/user/jobmanager. >> 10:47:13,624 INFO org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$main$1$$anon$1 >> - Starting job manager at akka://flink/user/jobmanager. >> 10:47:13,625 INFO org.apache.flink.runtime.blob.BlobServer >> - Started BLOB server on port 34038 >> 10:47:13,625 INFO org.apache.flink.runtime.blob.BlobServer >> - Created BLOB server storage directory >> /tmp/blobStore-88f5ebb0-15e2-47a6-ad56-fb2970d83ee2 >> 10:47:13,626 INFO org.apache.flink.runtime.jobmanager.web.WebInfoServer >> - Setting up web info server, using web-root directoryjar:file: .= .. >> 10:47:13,626 INFO org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$main$1$$anon$1 >> - Started job manager. Waiting for incoming messages. >> 10:47:13,626 INFO org.apache.flink.runtime.jobmanager.web.WebInfoServer >> - Web info server will display information about flink job-manage= r >> on ... >> 10:47:13,627 INFO org.apache.flink.runtime.jobmanager.web.WebInfoServer >> - Starting web info server for JobManager on port ... >> 10:47:13,627 INFO org.eclipse.jetty.util.log >> - jetty-0.9-SNAPSHOT >> 10:47:13,738 INFO org.eclipse.jetty.util.log >> - Started SelectChannelConnector@0.0.0.0:8082 >> 10:47:14,032 ERROR org.apache.flink.runtime.jobmanager.JobManager$$anonf= un$main$1$$anon$1 >> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >> 7e. >> 10:47:14,068 ERROR org.apache.flink.runtime.jobmanager.JobManager$$anonf= un$main$1$$anon$1 >> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >> 7e. >> 10:47:14,069 ERROR org.apache.flink.runtime.jobmanager.JobManager$$anonf= un$main$1$$anon$1 >> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >> 7e. >> 10:47:14,107 ERROR org.apache.flink.runtime.jobmanager.JobManager$$anonf= un$main$1$$anon$1 >> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >> 7e. >> >> >> >> On 05.02.2015 11:09, Till Rohrmann wrote: >> >>> I checked and indeed the scheduleOrUpdateConsumers method can throw an >>> IllegalStateException without properly handling such an exception on th= e >>> JobManager level. >>> >>> It is a design decision of Scala not to complain about unhandled >>> exceptions >>> which are otherwise properly annotated in Java code. We should definite= ly >>> pay attention in Scala to properly handle thrown exceptions of Java cod= e. >>> >>> On Thu, Feb 5, 2015 at 10:42 AM, Stephan Ewen wrote: >>> >>> I suspect that this is one of the cases where an exception in an actor >>>> causes the actor to die (here the job manager) >>>> >>>> On Thu, Feb 5, 2015 at 10:40 AM, Till Rohrmann >>>> wrote: >>>> >>>> It looks to me that the TaskManager does not receive a >>>>> ConsumerNotificationResult after having send the >>>>> >>>> ScheduleOrUpdateConsumers >>>> >>>>> message. This can either mean that something went wrong in >>>>> ExecutionGraph.scheduleOrUpdateConsumers method or the connection was >>>>> disassociated for some reasons. The logs would indeed be very helpful >>>>> to >>>>> understand what happened. >>>>> >>>>> On Thu, Feb 5, 2015 at 10:36 AM, Ufuk Celebi wrote: >>>>> >>>>> Hey Chesnay, >>>>>> >>>>>> I will look into it. Can you share the complete LOGs? >>>>>> >>>>>> =E2=80=93 Ufuk >>>>>> >>>>>> On 04 Feb 2015, at 14:49, Chesnay Schepler < >>>>>> >>>>> chesnay.schepler@fu-berlin.de> >>>>> >>>>>> wrote: >>>>>> >>>>>> Hello, >>>>>>> >>>>>>> I'm trying to run python jobs with the latest master on a cluster a= nd >>>>>>> >>>>>> get the following exception: >>>>>> >>>>>>> Error: The program execution failed: JobManager not reachable >>>>>>> >>>>>> anymore. >>>> >>>>> Terminate waiting for job answer. >>>>>> >>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>> >>>>>> program >>>> >>>>> execution failed: JobManager not reachable anymore. Terminate waiting >>>>>> >>>>> for >>>> >>>>> job answer. >>>>>> >>>>>>> at org.apache.flink.client.program.Client.run(Client.java:345) >>>>>>> at org.apache.flink.client.program.Client.run(Client.java:304) >>>>>>> at org.apache.flink.client.program.Client.run(Client.java:298) >>>>>>> at >>>>>>> >>>>>> org.apache.flink.client.program.ContextEnvironment. >>>> execute(ContextEnvironment.java:55) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute( >>>> ExecutionEnvironment.java:677) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.languagebinding.api.java.python.PythonPlanBinder. >>>> runPlan(PythonPlanBinder.java:106) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.languagebinding.api.java. >>>> python.PythonPlanBinder.main(PythonPlanBinder.java:79) >>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke( >>>> NativeMethodAccessorImpl.java:57) >>>> >>>>> at >>>>>>> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke( >>>> DelegatingMethodAccessorImpl.java:43) >>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> at >>>>>>> >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod( >>>> PackagedProgram.java:437) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.client.program.PackagedProgram. >>>> invokeInteractiveModeForExecution(PackagedProgram.java:353) >>>> >>>>> at org.apache.flink.client.program.Client.run(Client.java:250) >>>>>>> at >>>>>>> >>>>>> org.apache.flink.client.CliFrontend.executeProgram( >>>> CliFrontend.java:387) >>>> >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:356) >>>>>>> at >>>>>>> >>>>>> org.apache.flink.client.CliFrontend.parseParameters( >>>> CliFrontend.java:1066) >>>> >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1090= ) >>>>>>> >>>>>>> In the jobmanager log file i find this exception: >>>>>>> >>>>>>> java.lang.IllegalStateException: Buffer has already been recycled. >>>>>>> at >>>>>>> >>>>>> org.apache.flink.shaded.com.google.common.base. >>>> Preconditions.checkState(Preconditions.java:176) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.buffer.Buffer. >>>> ensureNotRecycled(Buffer.java:131) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.buffer.Buffer.setSize( >>>> Buffer.java:95) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.api.serialization. >>>> SpanningRecordSerializer.getCurrentBuffer( >>>> SpanningRecordSerializer.java:151) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.api.writer. >>>> RecordWriter.clearBuffers(RecordWriter.java:158) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.operators.RegularPactTask. >>>> clearWriters(RegularPactTask.java:1533) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.operators.RegularPactTask. >>>> invoke(RegularPactTask.java:367) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment. >>>> run(RuntimeEnvironment.java:204) >>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> the same exception is in the task manager logs, along with the >>>>>>> >>>>>> following >>>>> >>>>>> one: >>>>>> >>>>>>> java.util.concurrent.TimeoutException: Futures timed out after [100 >>>>>>> >>>>>> seconds] >>>>>> >>>>>>> at >>>>>>> >>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219= ) >>>>>> >>>>>>> at >>>>>>> >>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise. >>>>>> scala:223) >>>>>> >>>>>>> at >>>>>>> >>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>> >>>>>>> at >>>>>>> >>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn( >>>> BlockContext.scala:53) >>>> >>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.akka.AkkaUtils$.ask(AkkaUtils.scala:265) >>>>>> >>>>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.akka.AkkaUtils.ask(AkkaUtils.scala) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.partition. >>>> IntermediateResultPartition.scheduleOrUpdateConsumers( >>>> IntermediateResultPartition.java:247) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.partition. >>>> IntermediateResultPartition.maybeNotifyConsumers( >>>> IntermediateResultPartition.java:240) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.partition. >>>> IntermediateResultPartition.add(IntermediateResultPartition.java:144) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.api.writer. >>>> BufferWriter.writeBuffer(BufferWriter.java:74) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit( >>>> RecordWriter.java:91) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect( >>>> OutputCollector.java:88) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.languagebinding.api.java.common.streaming.Receiver. >>>> collectBuffer(Receiver.java:253) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.languagebinding.api.java.common.streaming.Streamer. >>>> streamBufferWithoutGroups(Streamer.java:193) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.languagebinding.api.java.python.functions. >>>> PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.operators.MapPartitionDriver. >>>> run(MapPartitionDriver.java:98) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.operators.RegularPactTask.run( >>>> RegularPactTask.java:496) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.operators.RegularPactTask. >>>> invoke(RegularPactTask.java:360) >>>> >>>>> at >>>>>>> >>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment. >>>> run(RuntimeEnvironment.java:204) >>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> >>>>>> >> > --001a11c3c12e2b1800050ecd9fe6--