Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 968E2200D64 for ; Tue, 12 Dec 2017 00:17:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 94FA2160C13; Mon, 11 Dec 2017 23:17:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E331C160C23 for ; Tue, 12 Dec 2017 00:17:26 +0100 (CET) Received: (qmail 49972 invoked by uid 500); 11 Dec 2017 23:17:26 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 49963 invoked by uid 99); 11 Dec 2017 23:17:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Dec 2017 23:17:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 5C2CF180161 for ; Mon, 11 Dec 2017 23:17:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 0PBFe-I-0Uxi for ; Mon, 11 Dec 2017 23:17:23 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 265C26280F for ; Mon, 11 Dec 2017 23:09:07 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A4BA8E2585 for ; Mon, 11 Dec 2017 23:09:06 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4B5A321300 for ; Mon, 11 Dec 2017 23:09:06 +0000 (UTC) Date: Mon, 11 Dec 2017 23:09:06 +0000 (UTC) From: "Eugene Kirpichov (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-3234) PubsubIO batch size should be configurable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 11 Dec 2017 23:17:27 -0000 [ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286710#comment-16286710 ] Eugene Kirpichov commented on BEAM-3234: ---------------------------------------- Agreed, based on looking at the code, this seems easy to fix in PubsubBoundedWriter. The unbounded write path seems to already handle byte size limits properly. I'm not quite sure why these use separate codepaths. > PubsubIO batch size should be configurable > ------------------------------------------ > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp > Affects Versions: 2.1.0, 2.2.0 > Reporter: Neville Li > Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. > https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 > {code} > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 10000000; i++) { > b.append("x"); > } > return b.toString(); > })) > .apply(PubsubIO > .writeStrings() > .to("projects/scio-playground/topics/payload-test")); > pipeline.run().waitUntilFinish(); > } > } > {code} > The above code throws the following error: > {code} > [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request > [error] { > [error] "code" : 400, > [error] "errors" : [ { > [error] "domain" : "global", > [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", > [error] "reason" : "badRequest" > [error] } ], > [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", > [error] "status" : "INVALID_ARGUMENT" > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)