From commits-return-93217-archive-asf-public=cust-asf.ponee.io@beam.apache.org Wed Sep 19 15:14:04 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 88493180679 for ; Wed, 19 Sep 2018 15:14:03 +0200 (CEST) Received: (qmail 77799 invoked by uid 500); 19 Sep 2018 13:14:02 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 77790 invoked by uid 99); 19 Sep 2018 13:14:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Sep 2018 13:14:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2768A1A04BC for ; Wed, 19 Sep 2018 13:14:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 5k25ejwown2Z for ; Wed, 19 Sep 2018 13:14:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id DFDB05FB81 for ; Wed, 19 Sep 2018 13:14:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 6E29AE111B for ; Wed, 19 Sep 2018 13:14:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3305823F99 for ; Wed, 19 Sep 2018 13:14:00 +0000 (UTC) Date: Wed, 19 Sep 2018 13:14:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Work logged] (BEAM-3371) Add ability to stage directories with compiled classes to Spark MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/BEAM-3371?focusedWorklogId=3D1= 45651&page=3Dcom.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpan= el#worklog-145651 ] ASF GitHub Bot logged work on BEAM-3371: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Sep/18 13:13 Start Date: 19/Sep/18 13:13 Worklog Time Spent: 10m=20 Work Description: mxm commented on a change in pull request #6244: [B= EAM-3371] Enable running integration tests on Spark URL: https://github.com/apache/beam/pull/6244#discussion_r218792264 =20 =20 ########## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/Flink= PipelineExecutionEnvironment.java ########## @@ -95,73 +83,33 @@ public void translate(Pipeline pipeline) { pipeline.replaceAll( FlinkTransformOverrides.getDefaultOverrides(translationMode =3D=3D= TranslationMode.STREAMING)); =20 - // Local flink configurations work in the same JVM and have no problem= s with improperly - // formatted files on classpath (eg. directories with .class files or = empty directories). - // prepareFilesToStage() only when using remote flink cluster. - List filesToStage; - if (!options.getFlinkMaster().matches("\\[.*\\]")) { - filesToStage =3D prepareFilesToStage(); - } else { - filesToStage =3D options.getFilesToStage(); - } + prepareFilesToStageForRemoteClusterExecution(); =20 FlinkPipelineTranslator translator; if (translationMode =3D=3D TranslationMode.STREAMING) { this.flinkStreamEnv =3D - FlinkExecutionEnvironments.createStreamExecutionEnvironment(opti= ons, filesToStage); + FlinkExecutionEnvironments.createStreamExecutionEnvironment( + options, options.getFilesToStage()); translator =3D new FlinkStreamingPipelineTranslator(flinkStreamEnv, = options); } else { this.flinkBatchEnv =3D - FlinkExecutionEnvironments.createBatchExecutionEnvironment(optio= ns, filesToStage); + FlinkExecutionEnvironments.createBatchExecutionEnvironment( + options, options.getFilesToStage()); translator =3D new FlinkBatchPipelineTranslator(flinkBatchEnv, optio= ns); } =20 translator.translate(pipeline); } =20 - private List prepareFilesToStage() { - return options - .getFilesToStage() - .stream() - .map(File::new) - .filter(File::exists) - .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) = : file.getAbsolutePath()) - .collect(Collectors.toList()); - } - - private String packageDirectoriesToStage(File directoryToStage) { - String hash =3D calculateDirectoryContentHash(directoryToStage); - String pathForJar =3D getUniqueJarPath(hash); - zipDirectory(directoryToStage, pathForJar); - return pathForJar; - } - - private String calculateDirectoryContentHash(File directoryToStage) { - Hasher hasher =3D Hashing.md5().newHasher(); - try (OutputStream hashStream =3D Funnels.asOutputStream(hasher)) { - ZipFiles.zipDirectory(directoryToStage, hashStream); - return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes(= )); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private String getUniqueJarPath(String contentHash) { - String tempLocation =3D options.getTempLocation(); - - checkArgument( - !Strings.isNullOrEmpty(tempLocation), - "Please provide \"tempLocation\" pipeline option. Flink runner nee= ds it to store jars " - + "made of directories that were on classpath."); - - return String.format("%s%s.jar", tempLocation, contentHash); - } - - private void zipDirectory(File directoryToStage, String uniqueDirectoryP= ath) { - try { - ZipFiles.zipDirectory(directoryToStage, new FileOutputStream(uniqueD= irectoryPath)); - } catch (IOException e) { - throw new RuntimeException(e); + /** + * Local configurations work in the same JVM and have no problems with i= mproperly formatted files + * on classpath (eg. directories with .class files or empty directories)= . Prepare files for + * staging only when using remote cluster. + */ + private void prepareFilesToStageForRemoteClusterExecution() { =20 Review comment: Would make this static and pass the options explictly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 145651) Time Spent: 3h 10m (was: 3h) > Add ability to stage directories with compiled classes to Spark > --------------------------------------------------------------- > > Key: BEAM-3371 > URL: https://issues.apache.org/jira/browse/BEAM-3371 > Project: Beam > Issue Type: Sub-task > Components: runner-spark > Reporter: Lukasz Gajowy > Assignee: Jean-Baptiste Onofr=C3=A9 > Priority: Minor > Time Spent: 3h 10m > Remaining Estimate: 0h > > This one is basically the same issue as > [this Flink's one|https://issues.apache.org/jira/browse/BEAM-3370], exce= pt of two things: > - a detection of files to stage has to be provided in Spark, which is alr= eady being developed [here|https://issues.apache.org/jira/browse/BEAM-981] > - the test execution is not interrupted by FileNotFoundException but by *= the effect* of the directory not being staged (absence of test classes on t= he Spark's classpath, hence ClassNotFoundException). > Again, this probably could be resolved analogously as in flink, while BEA= M-981 issue is resolved.=20 -- This message was sent by Atlassian JIRA (v7.6.3#76005)