flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun su <sujun891...@gmail.com>
Subject Re: Blink模式下运用collect方法快速获取结果
Date Fri, 24 Apr 2020 06:48:43 GMT
hi all,

找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:

def collect[T](
    tEnv: TableEnvironment,
    table: Table,
    sink: CollectTableSink[T],
    jobName: Option[String]): Seq[T] = {
  val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
    .asInstanceOf[TypeInformation[T]]
    .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
      .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
  val id = new AbstractID().toString
  sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
  val sinkName = UUID.randomUUID().toString
  tEnv.registerTableSink(sinkName, sink)
  tEnv.insertInto(table, sinkName)

  val res = tEnv.execute("test")
  val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
  SerializedListAccumulator.deserializeList(accResult, typeSerializer)
}


jun su <sujun891020@gmail.com> 于2020年4月24日周五 下午2:05写道:

> hi all,
>
> blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> 结果用于代码调试么?
>
> --
> Best,
> Jun Su
>


-- 
Best,
Jun Su
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message