beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ismaël Mejía <ieme...@gmail.com>
Subject Re: about PCollection process
Date Fri, 06 Jul 2018 09:13:42 GMT
Hello,

If I understood correctly you read from a file some parameters that you are
going to use to prepare an HBase Scan. If this is the case you cannot do
this with the current HBaseIO API, but there is ongoing work to support
this transparently with the new SDF API. If you want to track the progress
this is the JIRA https://issues.apache.org/jira/browse/BEAM-4020
Hopefully it will be ready in the following days/weeks.

In the meantime you can do a workaround by applying a ParDo after you
extract the scan parameters from the files and then do a DoFn to request
the data, something similar to what SDF is doing, for ref:
https://github.com/iemejia/beam/blob/2f9b54c6efa1c97c4b030a9b1af44b1327541e5f/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L37

Hope this helps,
Ismaël

On Thu, Jul 5, 2018 at 4:53 AM Frank Li <surpass_li@aliyun.com> wrote:

> hello,every
>         I'm running a Beam pipeline which uses the TextIO read same text
> from text file, PTransform  peer line search hbase.  result is   PCollection<PCollection<KV<String,
> RecordData>>>
>
>
> @Override
> public PCollection<PCollection<KV<String, RecordData>>>
> expand(PCollection<String> lines) {
>
> PCollection<PCollection<KV<String, RecordData>>> results = lines
> .apply(ParDo.of(new DoFn<String, PCollection<KV<String, RecordData>>>()
{
> @ProcessElement
> public void processElement(ProcessContext c) {
> String vin = c.element();
>
> Pipeline pipelineHbase = Pipeline.create(c.getPipelineOptions());
>
> HBaseIO.Read read =
> HBaseIO.read().withConfiguration(conf).withTableId(hbaseTable).withKeyRange(
> Bytes.toBytes(String.format("%s-%s", vin, startTime)),
> Bytes.toBytes(String.format("%s-%s", vin, endTime)));
> PCollection<Result> results = pipelineHbase.apply(read);
>
> PCollection<KV<String, RecordData>> recordResults = results
> .apply(ParDo.of(new Result2RecordNoModifyDataFn()));
>
> c.output(recordResults);
> }
>
> }));
>
> return results;
> }
>
>
> what process PCollection<PCollection<KV<String, RecordData>>> ????
>
>

Mime
View raw message