From commits-return-59843-archive-asf-public=cust-asf.ponee.io@beam.apache.org Mon Mar 5 23:43:05 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BE75E180608 for ; Mon, 5 Mar 2018 23:43:04 +0100 (CET) Received: (qmail 25222 invoked by uid 500); 5 Mar 2018 22:43:03 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 25212 invoked by uid 99); 5 Mar 2018 22:43:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Mar 2018 22:43:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 46D6F1A10E1 for ; Mon, 5 Mar 2018 22:43:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -101.51 X-Spam-Level: X-Spam-Status: No, score=-101.51 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100, WEIRD_PORT=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id acneSovlXMyt for ; Mon, 5 Mar 2018 22:43:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 0DEDF5F23D for ; Mon, 5 Mar 2018 22:43:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 7DC38E00A7 for ; Mon, 5 Mar 2018 22:43:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3770A21267 for ; Mon, 5 Mar 2018 22:43:00 +0000 (UTC) Date: Mon, 5 Mar 2018 22:43:00 +0000 (UTC) From: "Eugene Kirpichov (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-3778) Very poor performance of side inputs when input is finely sharded MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/BEAM-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386884#comment-16386884 ] Eugene Kirpichov commented on BEAM-3778: ---------------------------------------- More precisely: [https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1083] "Each bundle maps to one file exactly" is not what we want here. Something that's passed to View.asIterable or View.asList is likely not so big as to try to save on a reshuffle, so we should just insert a Reshuffle.viaRandomKey() before the ParDo. > Very poor performance of side inputs when input is finely sharded > ----------------------------------------------------------------- > > Key: BEAM-3778 > URL: https://issues.apache.org/jira/browse/BEAM-3778 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Reporter: Eugene Kirpichov > Assignee: Luke Cwik > Priority: Major > > This thread: > https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E > The user has a job that reads a few hundred thousand files and then writes them to BigQuery. This generates 1 temp file per input file. Then we gather the temp files into a View.asList() side input - and this side input ends up containing a few hundred thousand tiny ISM files, with 1 element per file, which performs horribly (taking hours to read the side input). > I think we need to reshuffle things onto a reasonable number of shards before writing them to ISM. > A side issue: this https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46 triggers also the coder size estimation logic, which falsely thinks that size estimation in this case is cheap, and does double the work, as evidenced by the following stack trace: > Processing lull for PT30900.015S in state process of WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) > java.net.SocketInputStream.socketRead0(Native Method) > java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > java.net.SocketInputStream.read(SocketInputStream.java:170) > java.net.SocketInputStream.read(SocketInputStream.java:141) > sun.security.ssl.InputRecord.readFully(InputRecord.java:465) > sun.security.ssl.InputRecord.read(InputRecord.java:503) > sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) > sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940) > sun.security.ssl.AppInputStream.read(AppInputStream.java:105) > java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > java.io.BufferedInputStream.read(BufferedInputStream.java:345) > sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704) > sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) > sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536) > sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) > java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) > sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) > com.google.api.client.http.javanet.NetHttpResponse.(NetHttpResponse.java:37) > com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380) > com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784) > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656) > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560) > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > java.io.InputStream.read(InputStream.java:101) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81) > org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79) > org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63) > org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694) > com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999) > com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79) > com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952) > com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942) > com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580) > com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569) > com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554) > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904) > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605) > com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770) > com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79) > com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641) > com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674) > com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620) > com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715) > java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042) > org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195) > org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60) > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685) > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599) > com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520) > com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134) > com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63) > com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46) > com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272) -- This message was sent by Atlassian JIRA (v7.6.3#76005)