flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chet Masterson <chet.master...@yandex.com>
Subject Re: Queryable State
Date Mon, 24 Apr 2017 12:27:37 GMT
<div>Ufuk - thank you for your help. My flink-conf.yaml is now configured cluster-wide
(with restart) as:</div><div> </div><div><div># my flink-conf.yaml,
on all flink nodes:</div><div>jobmanager.rpc.address: x.x.x.x</div><div>jobmanager.rpc.port:
6123</div><div>query.server.port: 6123</div><div>query.server.enable:
true</div><div> </div><div>When I try to issue my query now with
the above settings (this query worked on a single flink node running by itself):</div></div><div> </div><div> </div><div><div>2017/04/24
07:08:26.940 ERROR [OneForOneStrategy] Error while decoding incoming Akka PDU of length: 3623</div><div>akka.remote.transport.AkkaProtocolException:
Error while decoding incoming Akka PDU of length: 3623</div><div>Caused by: akka.remote.transport.PduCodecException:
Decoding PDU failed.</div><div>        at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167)</div><div>       
at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:621)</div><div>       
at akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:372)</div><div>       
at akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:367)</div><div> </div><div> </div><div>My
client code. I verified the job id exists, and is running, the stateName exists, and is populated.
Again, this code runs on a single standalone flink node</div><div> </div><div>//
server = job manager ip, which I can route to, and responds on port 6123</div><div>//
port = 6123</div><div><div>    private static QueryableStateClient newQueryableStateClient(String
server, int port) {</div><div>        Configuration configFlink = new Configuration();</div><div>       
configFlink.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, server);</div><div>       
configFlink.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);</div><div> </div><div>       
try {</div><div>            client = new QueryableStateClient(configFlink);</div><div>           
return client;</div><div>        }</div><div>       
catch (Exception e) {</div><div>            logger.error("Error configuring
QueryableStateGateway: "+e);</div><div>            return null;</div><div>       
}</div><div>    }</div><div> </div></div><div><div>public
HashMap&lt;Long, java.util.ArrayList&lt;String&gt;&gt; executeQuery(Tuple2&lt;String,
String&gt; key, String flinkJobID, String stateName) {</div><div>       
JobID jobId = JobID.fromHexString(flinkJobID);</div><div>        byte[]
serializedKey = getSeralizedKey(key);</div><div>        Future&lt;byte[]&gt;
future = client.getKvState(jobId, stateName, key.hashCode(), serializedKey);</div><div>       
try {</div><div>            byte[] serializedResult = Await.result(future,
new FiniteDuration(maxQueryTime, TimeUnit.SECONDS));</div><div>           
HashMap&lt;Long, java.util.ArrayList&lt;String&gt;&gt; results = deserializeResponseGlobalCoverage(serializedResult);</div><div>           
return results;</div><div>        }</div><div>       
catch (Exception e) {</div><div>            logger.error("Queryable
State Error: "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());</div><div>           
return null;</div><div>        }</div><div>    }</div></div><div> </div><div> </div><div>Thank
you for the help!</div><div> </div><div> </div></div><div> </div><div>24.04.2017,
07:55, "Ufuk Celebi" &lt;uce@apache.org&gt;:</div><blockquote type="cite"><p>You
should be able to use queryable state w/o any changes to the<br />default config. The
`query.server.port` option defines the port of the<br />queryable state server that
serves the state on the task managers and<br />it is enabled by default.<br /><br
/>The important thing is to configure the client to discover the<br />JobManager
and everything else should work out of the box. Can you<br />please<br /><br
/>1) Use the default config and verify in the JobManager logs that the<br />JobManager
listens on port 6123 (the default JM port) and that all<br />expected TaskManagers connect
to it?<br /><br />2) Share the code for how you configure the QueryableStateClient?<br
/><br />– Ufuk</p></blockquote>
View raw message