beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3371) Add ability to stage directories with compiled classes to Spark
Date Wed, 19 Sep 2018 13:14:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3371?focusedWorklogId=145651&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145651
]

ASF GitHub Bot logged work on BEAM-3371:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Sep/18 13:13
            Start Date: 19/Sep/18 13:13
    Worklog Time Spent: 10m 
      Work Description: mxm commented on a change in pull request #6244: [BEAM-3371] Enable
running integration tests on Spark
URL: https://github.com/apache/beam/pull/6244#discussion_r218792264
 
 

 ##########
 File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 ##########
 @@ -95,73 +83,33 @@ public void translate(Pipeline pipeline) {
     pipeline.replaceAll(
         FlinkTransformOverrides.getDefaultOverrides(translationMode == TranslationMode.STREAMING));
 
-    // Local flink configurations work in the same JVM and have no problems with improperly
-    // formatted files on classpath (eg. directories with .class files or empty directories).
-    // prepareFilesToStage() only when using remote flink cluster.
-    List<String> filesToStage;
-    if (!options.getFlinkMaster().matches("\\[.*\\]")) {
-      filesToStage = prepareFilesToStage();
-    } else {
-      filesToStage = options.getFilesToStage();
-    }
+    prepareFilesToStageForRemoteClusterExecution();
 
     FlinkPipelineTranslator translator;
     if (translationMode == TranslationMode.STREAMING) {
       this.flinkStreamEnv =
-          FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, filesToStage);
+          FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+              options, options.getFilesToStage());
       translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options);
     } else {
       this.flinkBatchEnv =
-          FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, filesToStage);
+          FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+              options, options.getFilesToStage());
       translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
     }
 
     translator.translate(pipeline);
   }
 
-  private List<String> prepareFilesToStage() {
-    return options
-        .getFilesToStage()
-        .stream()
-        .map(File::new)
-        .filter(File::exists)
-        .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) : file.getAbsolutePath())
-        .collect(Collectors.toList());
-  }
-
-  private String packageDirectoriesToStage(File directoryToStage) {
-    String hash = calculateDirectoryContentHash(directoryToStage);
-    String pathForJar = getUniqueJarPath(hash);
-    zipDirectory(directoryToStage, pathForJar);
-    return pathForJar;
-  }
-
-  private String calculateDirectoryContentHash(File directoryToStage) {
-    Hasher hasher = Hashing.md5().newHasher();
-    try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
-      ZipFiles.zipDirectory(directoryToStage, hashStream);
-      return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private String getUniqueJarPath(String contentHash) {
-    String tempLocation = options.getTempLocation();
-
-    checkArgument(
-        !Strings.isNullOrEmpty(tempLocation),
-        "Please provide \"tempLocation\" pipeline option. Flink runner needs it to store
jars "
-            + "made of directories that were on classpath.");
-
-    return String.format("%s%s.jar", tempLocation, contentHash);
-  }
-
-  private void zipDirectory(File directoryToStage, String uniqueDirectoryPath) {
-    try {
-      ZipFiles.zipDirectory(directoryToStage, new FileOutputStream(uniqueDirectoryPath));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+  /**
+   * Local configurations work in the same JVM and have no problems with improperly formatted
files
+   * on classpath (eg. directories with .class files or empty directories). Prepare files
for
+   * staging only when using remote cluster.
+   */
+  private void prepareFilesToStageForRemoteClusterExecution() {
 
 Review comment:
   Would make this static and pass the options explictly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145651)
    Time Spent: 3h 10m  (was: 3h)

> Add ability to stage directories with compiled classes to Spark
> ---------------------------------------------------------------
>
>                 Key: BEAM-3371
>                 URL: https://issues.apache.org/jira/browse/BEAM-3371
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-spark
>            Reporter: Lukasz Gajowy
>            Assignee: Jean-Baptiste Onofré
>            Priority: Minor
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> This one is basically the same issue as
>  [this Flink's one|https://issues.apache.org/jira/browse/BEAM-3370], except of two things:
> - a detection of files to stage has to be provided in Spark, which is already being developed
[here|https://issues.apache.org/jira/browse/BEAM-981]
> - the test execution is not interrupted by FileNotFoundException but by *the effect*
of the directory not being staged (absence of test classes on the Spark's classpath, hence
ClassNotFoundException).
> Again, this probably could be resolved analogously as in flink, while BEAM-981 issue
is resolved. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message