beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eugene Kirpichov (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-3234) PubsubIO batch size should be configurable
Date Mon, 11 Dec 2017 23:09:06 GMT

    [ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286710#comment-16286710
] 

Eugene Kirpichov commented on BEAM-3234:
----------------------------------------

Agreed, based on looking at the code, this seems easy to fix in PubsubBoundedWriter. The unbounded
write path seems to already handle byte size limits properly. I'm not quite sure why these
use separate codepaths.

> PubsubIO batch size should be configurable
> ------------------------------------------
>
>                 Key: BEAM-3234
>                 URL: https://issues.apache.org/jira/browse/BEAM-3234
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Neville Li
>            Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch
size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
>     PipelineOptions options = PipelineOptionsFactory.create();
>     Pipeline pipeline = Pipeline.create(options);
>     pipeline
>         .apply(GenerateSequence.from(0).to(100))
>         .apply(MapElements
>             .into(TypeDescriptor.of(String.class))
>             .via(x -> {
>               StringBuilder b = new StringBuilder();
>               for (int i = 0; i < 10000000; i++) {
>                 b.append("x");
>               }
>               return b.toString();
>             }))
>         .apply(PubsubIO
>             .writeStrings()
>             .to("projects/scio-playground/topics/payload-test"));
>     pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException:
400 Bad Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error]     "domain" : "global",
> [error]     "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> [error]     "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message