flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Not able to query : Queryable State
Date Wed, 07 Sep 2016 08:07:53 GMT
I think the exception message is saying what’s the problem. The job simply
does not exist. You can verify that by running bin/flink list or look it up
in the web interface.

The reason is that calling env.getStreamGraph.getJobGraph will generate a
new JobGraph (not the one which is sent to the JobManager) and this
JobGraph will get a new JobID assigned. Thus, the JobGraph which you send
to the JobManager and the one you used to retrieve the JobID from are
different.

Cheers,
Till
​

On Wed, Sep 7, 2016 at 8:07 AM, pushpendra.jaiswal <
pushpendra.jaiswal90@gmail.com> wrote:

> Hi Stefan
>
> Please find below stack trace and code :
>
> java.lang.IllegalStateException: Job 81ca41b13e7be8feb99f064e5a9a4237 not
> found
>         at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$
> jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1470)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.
> applyOrElse(JobManager.scala:684)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>         at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>         at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.
> scala:118)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.
> scala:123)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Code :
>
> class Aggregator(val stream: KeyedStream[Record, Long]) extends
> Serializable
> {
>
>   def reduceFunction = new ReduceFunction[Record] {
>     override def reduce(t: Record, t1: Record): Record = {
>       val total = t + t1
>       total
>     }
>   }
>
>   val reducingStateDesc = new ReducingStateDescriptor[Record]("record
> reducing descriptor", reduceFunction, classOf[Record])
> //  reducingStateDesc.setQueryable("queryStore")
>
>   def reduceToQueryable = {
>     stream.asQueryableState("queryStore", reducingStateDesc)
>   }
> }
>
> class FlinkQuery[T](jobID: JobID, val serializer:
> TypeSerializer[T],jobManagerIP:String, jobManagerPort:Int) extends
> Serializable with LazyLogging {
>
>   @Transient
>   private lazy val client = new QueryableStateClient(config)
>
>  val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
>     kvState.onFailure(onFailure)
>     kvState.onSuccess(onSuccess)
>
>  def onFailure = new PartialFunction[Throwable, String] {
>     override def isDefinedAt(x: Throwable): Boolean = true
>     override def apply(v1: Throwable): String = {
>       logger.error("failed to query " + v1.getLocalizedMessage)
>     }
>   }
>
>
>   def onSuccess = new PartialFunction[Array[Byte], Array[Byte]] {
>     override def isDefinedAt(x: Array[Byte]): Boolean = x != Nil
>
>     override def apply(v1: Array[Byte]) = {
>       logger.error("got result " + v1);
>       v1
>     }
>   }
> }
>
> class Driver {
>     val jobID = env.getStreamGraph.getJobGraph.getJobID
>     val aggregatedNQueryable = driver.aggregateWithQueryable(stream)
>     val queryStoreName =  aggregatedNQueryable.getQueryableStateName
>     val serializer =    aggregatedNQueryable.getKeySerializer
>      val valueSerializer =    aggregatedNQueryable.getValueSerialize
> }
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fwd-Not-able-to-
> query-Queryable-State-tp8808p8938.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message