flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chet Masterson <chet.master...@yandex.com>
Subject Re: Queryable State
Date Wed, 26 Apr 2017 16:11:32 GMT
<div>After setting the logging to DEBUG on the job manager, I learned four things:</div><div> </div><div>(On
the message formatting below, I have the Flink logs formatted into JSON so I can import them
into Kibana)</div><div> </div><div>1. The appropriate key value state
is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1,
I saw one registration message in the log, in the parallelism = 3, I saw two registration
messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
"msg":"Key value state registered for job &lt;job id&gt; under name &lt;statename&gt;"}</div><div> </div><div>2.
When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup
key-value state for job &lt;job id&gt; with registration name &lt;statename&gt;".
In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages.</div><div> </div><div>3.
I saw no other messages in the job manager log that seemed relevant.</div><div> </div><div>4.
When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation
with a message of null.</div><div> </div><div>Thanks!</div><div> </div><div> </div><div><div> </div></div><div> </div><div> </div><div>26.04.2017,
09:52, "Ufuk Celebi" &lt;uce@apache.org&gt;:</div><blockquote type="cite"><p>Thanks!
Your config looks good to me.<br /><br />Could you please set the log level org.apache.flink.runtime.jobmanager
to DEBUG?<br /><br />log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG<br
/><br />Then we can check whether the JobManager logs the registration of the<br
/>state instance with the respective name in the case of parallelism &gt;<br />1?<br
/><br />Expected output is something like this: "Key value state registered<br
/>for job ${msg.getJobId} under name ${msg.getRegistrationName}."<br /><br />–
Ufuk<br /><br />On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson<br />&lt;<a
href="mailto:chet.masterson@yandex.com">chet.masterson@yandex.com</a>&gt; wrote:</p><blockquote> Ok...more
information.<br /><br /> 1. Built a fresh cluster from the ground up. Started
testing queryable state<br /> at each step.<br /> 2. When running under any
configuration of task managers and job managers<br /> were parallelism = 1, the queries
execute as expected.<br /> 3. As soon as I cross over to parallelism = 3 with 3 task
managers (1 job<br /> manager) feeding off a kafka topic partitioned three ways, queries
will<br /> always fail, returning error<br /> (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation)
with an<br /> error message of null.<br /> 4. I do know my state is as expected
on the cluster. Liberal use of trace<br /> prints show my state managed on the jobs
is as I expect. However, I cannot<br /> query them external.<br /> 5. I am sending
the query to jobmanager.rpc.port = 6123, which I confirmed<br /> is configured by using
the job manager UI.<br /> 6. My flink-conf.yaml:<br /><br /> jobmanager.rpc.address:
flink01<br /> jobmanager.rpc.port: 6123<br /> jobmanager.heap.mb: 256<br
/><br /> taskmanager.heap.mb: 512<br /> taskmanager.data.port: 6121<br
/> taskmanager.numberOfTaskSlots: 1<br /> taskmanager.memory.preallocate: false<br
/><br /> parallelism.default: 1<br /> blob.server.port: 6130<br /><br
/> jobmanager.web.port: 8081<br /> query.server.enable: true<br /><br
/> 7. I do know my job is indeed running in parallel, from trace prints going<br /> to
the task manager logs.<br /><br /> Do I need a backend configured when running
in parallel for the queryable<br /> state? Do I need a shared temp directory on the
task managers?<br /><br /> THANKS!<br /><br /><br /> 25.04.2017,
04:24, "Ufuk Celebi" &lt;<a href="mailto:uce@apache.org">uce@apache.org</a>&gt;:<br
/><br /> It's strange that the rpc port is set to 30000 when you use a<br /> standalone
cluster and configure 6123 as the port. I'm pretty sure<br /> that the config has not
been updated.<br /><br /> But everything should work as you say when you point
it to the correct<br /> jobmanager address and port. Could you please post the complete<br
/> stacktrace you get instead of the message you log?<br /><br /><br /> On
Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson<br /> &lt;<a href="mailto:chet.masterson@yandex.com">chet.masterson@yandex.com</a>&gt;
wrote:<br /><br /><br /><br />  More information:<br /><br
/>  0. I did remove the query.server.port and query.server.enabled from all<br />  flink-conf.yaml
files, and restarted the cluster.<br /><br />  1. The Akka error doesn't seem
to have anything to do with the problem. If<br /> I<br />  point my query client
at an IP address with no Flink server running at all,<br />  I get that error. It
seems to be a (side effect?) timeout for "no flink<br />  service is listening on
the port you told me to check"<br /><br />  2. I did notice (using the Flink
Web UI) even with the config file changes<br />  in 0, and no changes to the default
flink-conf.yaml the jobmanager.rpc.port<br />  (6123), on my cluster, jobmanager.rpc.port
is set to 30000.<br /><br />  3. If I do send a query using the jobmanager.rpc.address
and the<br />  jobmanager.rpc.port as displayed in the Flink Web UI, the connection
to<br /> from<br />  the client to Flink will be initiated and completed. When
I try to execute<br />  the query (code below), it will fail, and will get trapped.
When I look at<br />  the error message returned (e.getMessage() below), it is simply
'null':<br /><br />  try {<br />        byte[] serializedResult
= Await.result(future, new<br />  FiniteDuration(maxQueryTime, TimeUnit.SECONDS));<br
/>        // de-serialize, commented out for testing<br />        return
null;<br />          }<br />          catch (Exception e)
{<br />              logger.error("Queryable State Error:<br />  "+key+"-"+flinkJobID+"-"+stateName+"
Error: "+e.getMessage());<br />              return null;<br />          }<br
/><br />  Should I be sending the query to the job manager on the the job manager's<br
/>  rpc port when flink is clustered?<br /><br />  ALSO - I do know the
state name I am trying to query exists, is populated,<br />  and the job id exists.
I also know the task managers are communicating with<br />  the job managers (task
managers data port: 6121) and processed the data<br /> that<br />  resulted
in the state variable I am trying to query being populated. All<br />  this was logged.<br
/><br /><br />  24.04.2017, 10:34, "Ufuk Celebi" &lt;<a href="mailto:uce@apache.org">uce@apache.org</a>&gt;:<br
/><br />  Hey Chet! You can remove<br /><br />  query.server.port:
6123<br />  query.server.enable: true<br /><br />  That shouldn't cause
the Exception we see here though. I'm actually<br />  not sure what is causing the
PduCodecException. Could this be related<br />  to different Akka versions being used
in Flink and your client code?<br />  [1] Is it possible for you to check this?<br
/><br />  – Ufuk<br /><br />  [1] <a href="https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0">https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0</a></blockquote></blockquote>
View raw message