flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jingsong Li <jingsongl...@gmail.com>
Subject Re: Blink模式下运用collect方法快速获取结果
Date Fri, 24 Apr 2020 07:02:12 GMT
1.10里面有TableUtils了,里面有collectToList


Best,
Jingsong Lee

On Fri, Apr 24, 2020 at 2:49 PM jun su <sujun891020@gmail.com> wrote:

> hi all,
>
> 找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
> 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:
>
> def collect[T](
>     tEnv: TableEnvironment,
>     table: Table,
>     sink: CollectTableSink[T],
>     jobName: Option[String]): Seq[T] = {
>   val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
>     .asInstanceOf[TypeInformation[T]]
>     .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
>       .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
>   val id = new AbstractID().toString
>   sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
>   val sinkName = UUID.randomUUID().toString
>   tEnv.registerTableSink(sinkName, sink)
>   tEnv.insertInto(table, sinkName)
>
>   val res = tEnv.execute("test")
>   val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
>   SerializedListAccumulator.deserializeList(accResult, typeSerializer)
> }
>
>
> jun su <sujun891020@gmail.com> 于2020年4月24日周五 下午2:05写道:
>
> > hi all,
> >
> > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> > 结果用于代码调试么?
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best,
> Jun Su
>


-- 
Best, Jingsong Lee
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message