flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "chengwenfeng"<chengwenf...@unipus.cn>
Subject Queryable State 查询反序列化问题
Date Tue, 12 Nov 2019 06:46:45 GMT
大家好:
我在测试Querable State功能的时候,发现
语法
        dataStream.keyby(key).process();  这种语法下,简单的状态和复杂的POJO都可以查询
但在
       studentAnswerDataStream.connect(learningStrategyDataStream)
                .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
                        , val->val.getCourseId()+"_"+val.getTaskId())
                .process()  这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来




错误:


Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 9.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by:
java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in
the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected magic number 48.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
... 10 more


at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41)
Caused by: java.lang.RuntimeException: Failed request 0.
 Caused by: java.lang.RuntimeException: Failed request 9.
 Caused by: java.lang.RuntimeException: Error while processing request with ID 9. Caused by:
java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in
the key/namespace serializers used by the KvState instance and this access.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
at org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected magic number 48.
at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
... 10 more


at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




使用版本1.9.1


代码如下


状态代码
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;


public class QueryableStateDemo2 {

 public static void main(String[] args) throws Exception {

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);

 DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new SourceFunction<StudentAnswer>()
{
 @Override
 public void run(SourceContext<StudentAnswer> ctx) throws Exception {
 StudentAnswer studentAnswer = new StudentAnswer();
 studentAnswer.setCourseId(100L);
 studentAnswer.setTaskId("ug01");
 studentAnswer.setUserId(1L);
 studentAnswer.setAnswer("答案");
 ctx.collect(studentAnswer);
 while (true) {
 Thread.sleep(1000 * 60);
 }
 }

 @Override
 public void cancel() {

 }
 });

 DataStream<LearningStrategy> learningStrategyDataStream = env.addSource(new SourceFunction<LearningStrategy>()
{
 @Override
 public void run(SourceContext<LearningStrategy> ctx) throws Exception {
 LearningStrategy learningStrategy = new LearningStrategy();
 learningStrategy.setCourseId(100L);
 learningStrategy.setTaskId("ug01");
 ctx.collect(learningStrategy);
 while (true) {
 Thread.sleep(1000 * 60);
 }
 }

 @Override
 public void cancel() {

 }
 });

 studentAnswerDataStream.connect(learningStrategyDataStream)
 .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
 , val->val.getCourseId()+"_"+val.getTaskId())
 .process(new KeyedCoProcessFunction<String, StudentAnswer, LearningStrategy, String>()
{


 private transient ValueState<StudentAnswer> leftBuffer;
 private transient ValueState<LearningStrategy> rightBuffer;


 @Override
 public void open(Configuration conf) {
 ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new ValueStateDescriptor<>(
 "left_buffer", TypeInformation.of(StudentAnswer.class));
 leftBufferDescriptor.setQueryable("left_buffer_query");

 ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new ValueStateDescriptor<>(
 "right_buffer", TypeInformation.of(LearningStrategy.class));
 rightBufferDescriptor.setQueryable("right_buffer_query");

 leftBuffer = getRuntimeContext().getState(leftBufferDescriptor);
 rightBuffer = getRuntimeContext().getState(rightBufferDescriptor);
 }

 @Override
 public void processElement1(StudentAnswer value, Context context, Collector<String>
collector) throws Exception {
 System.out.println("processElement1:" + value);
 leftBuffer.update(value);
 String key = context.getCurrentKey();
 collector.collect(key);
 }

 @Override
 public void processElement2(LearningStrategy value, Context context, Collector<String>
collector) throws Exception {
 System.out.println("processElement2:" + value);
 rightBuffer.update(value);
 String key = context.getCurrentKey();
 collector.collect(key);
 }
 }).print("结果");


 env.execute("State");
 }


}

客户端代码:


import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.QueryableStateClient;

import java.util.concurrent.CompletableFuture;

public class GetQueryableState2 {

 public static void main(String[] args) throws Exception
 {


 QueryableStateClient client = new QueryableStateClient("localhost", 9069);

 ExecutionConfig executionConfig = new ExecutionConfig();
 client.setExecutionConfig(executionConfig);


 ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new ValueStateDescriptor<>(
 "left_buffer", TypeInformation.of(StudentAnswer.class));


 String key = "100_ug01";

 JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f");

 CompletableFuture<ValueState<StudentAnswer>> resultFuture =
 client.getKvState(jobId, "left_buffer_query", key
 , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor);


 ValueState<StudentAnswer> leftBuffer = resultFuture.get();
 System.out.println("结果:"+leftBuffer.value());

 // now handle the returned value
// resultFuture.thenAccept(response ->
// {
// try {
// Tuple2<String, Long> res = response.value();
//
// System.out.println("Queried sum value: " + res);
//
// } catch (Exception e)
// {
// e.printStackTrace();
// }
// System.out.println("Exiting future ...");
// });
 Thread.sleep(1000L*10);
 }

}




Domain如下
public class BaseDomain implements Serializable {

 protected String bn = "2019";
 protected String version = "1.0";

 public String getBn() {
 return bn;
 }

 public void setBn(String bn) {
 this.bn = bn;
 }

 public String getVersion() {
 return version;
 }

 public void setVersion(String version) {
 this.version = version;
 }
}


public class LearningStrategy extends BaseDomain {
 private Long courseId;
 private String taskId;
 private Byte pushOrder = 1;

 public Long getCourseId() {
 return courseId;
 }

 public void setCourseId(Long courseId) {
 this.courseId = courseId;
 }

 public String getTaskId() {
 return taskId;
 }

 public void setTaskId(String taskId) {
 this.taskId = taskId;
 }

 public Byte getPushOrder() {
 return pushOrder;
 }

 public void setPushOrder(Byte pushOrder) {
 this.pushOrder = pushOrder;
 }

 @Override
 public String toString() {
 return "LearningStrategy{" +
 "bn='" + bn + '\'' +
 ", version='" + version + '\'' +
 ", courseId=" + courseId +
 ", taskId='" + taskId + '\'' +
 ", pushOrder=" + pushOrder +
 '}';
 }
}


public class StudentAnswer extends BaseDomain{
 private Long courseId;
 private String taskId;
 private Long userId;
 private String answer;

 public Long getCourseId() {
 return courseId;
 }

 public void setCourseId(Long courseId) {
 this.courseId = courseId;
 }

 public String getTaskId() {
 return taskId;
 }

 public void setTaskId(String taskId) {
 this.taskId = taskId;
 }

 public Long getUserId() {
 return userId;
 }

 public void setUserId(Long userId) {
 this.userId = userId;
 }

 public String getAnswer() {
 return answer;
 }

 public void setAnswer(String answer) {
 this.answer = answer;
 }

 @Override
 public String toString() {
 return "StudentAnswer{" +
 "bn='" + bn + '\'' +
 ", version='" + version + '\'' +
 ", courseId=" + courseId +
 ", taskId='" + taskId + '\'' +
 ", userId=" + userId +
 ", answer='" + answer + '\'' +
 '}';
 }
}
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message