airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-2124) Allow local mainPythonFileUri
Date Wed, 21 Mar 2018 10:47:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407740#comment-16407740
] 

ASF subversion and git services commented on AIRFLOW-2124:
----------------------------------------------------------

Commit ec80f944183b744ff7cab7b72f5350240a8ac4ae in incubator-airflow's branch refs/heads/master
from [~Fokko]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ec80f94 ]

[AIRFLOW-2124] Upload Python file to a bucket for Dataproc

If the Python Dataproc file is on local storage,
we want to upload
this to google cloud storage before submitting it
to the dataproc
cluster

Closes #3130 from Fokko/airflow-stash-files-on-gcs


> Allow local mainPythonFileUri
> -----------------------------
>
>                 Key: AIRFLOW-2124
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2124
>             Project: Apache Airflow
>          Issue Type: Wish
>            Reporter: robbert van waardhuizen
>            Assignee: Fokko Driesprong
>            Priority: Major
>
> For our workflow, we currently are in the transition from using BashOperator to using
the DataProcPySparkOperators. While rewriting the DAG we came to the conclusion that it is
not possible to submit a (local) path as our main Python file, and a Hadoop Compatible
Filesystem (HCFS) is required.
> Our main Python drivers are located in a Git repository. Putting our main Python files
in a GS bucket would require manual updating/overwriting these files.
> In terms of code, this works using the BashOperator:
>  
> {code:java}
> gcloud dataproc jobs submit pyspark \
>  /usr/local/airflow/git/airflow-dags/jobs/main_python_driver.py \
>                  --cluster {cluster_name}{code}
>  
>  
> But cannot be replicated using the DataProcPySparkOperator:
> {code:java}
> DataProcPySparkOperator(main="/usr/local/airflow/git/airflow-dags/jobs/main_python_driver.py",
> cluster_name=cluster_name)
> {code}
> Error:
> {code:java}
> =========== Cloud Dataproc Agent Error ===========
> java.lang.NullPointerException
> at sun.nio.fs.UnixPath.normalizeAndCheck(UnixPath.java:77)
> at sun.nio.fs.UnixPath.<init>(UnixPath.java:71)
> at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281)
> at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler.registerResourceForDownload(AbstractJobHandler.java:442)
> at com.google.cloud.hadoop.services.agent.job.PySparkJobHandler.buildCommand(PySparkJobHandler.java:93)
> at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler$StartDriver.call(AbstractJobHandler.java:538)
> at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler$StartDriver.call(AbstractJobHandler.java:532)
> at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:127)
> at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:80)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> ======== End of Cloud Dataproc Agent Error ========
> {code}
> What would be best practice in this case?
> Is it possible to add the ability to submit local paths as main Python file?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message