Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 13ADE200D1D for ; Sat, 14 Oct 2017 13:44:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 07BD81609EA; Sat, 14 Oct 2017 11:44:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 23D1A1609D5 for ; Sat, 14 Oct 2017 13:44:18 +0200 (CEST) Received: (qmail 15965 invoked by uid 500); 14 Oct 2017 11:44:18 -0000 Mailing-List: contact dev-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list dev@airflow.incubator.apache.org Received: (qmail 15953 invoked by uid 99); 14 Oct 2017 11:44:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Oct 2017 11:44:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E89301A0C29 for ; Sat, 14 Oct 2017 11:44:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.699 X-Spam-Level: * X-Spam-Status: No, score=1.699 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=boristyukin.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 9JtZWS1DUCIX for ; Sat, 14 Oct 2017 11:44:14 +0000 (UTC) Received: from mx36-out26.antispamcloud.com (mx36-out26.antispamcloud.com [209.126.121.74]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 26B995F2A8 for ; Sat, 14 Oct 2017 11:44:14 +0000 (UTC) Received: from s2.fcomet.com ([99.198.101.250]) by mx5.antispamcloud.com with esmtps (TLSv1.2:ECDHE-RSA-AES256-GCM-SHA384:256) (Exim 4.89) (envelope-from ) id 1e3KrN-0004Rv-KX for dev@airflow.incubator.apache.org; Sat, 14 Oct 2017 13:44:06 +0200 DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=boristyukin.com; s=default; h=Content-Type:To:Subject:Message-ID:Date:From: References:In-Reply-To:MIME-Version:Sender:Reply-To:Cc: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date: Resent-From:Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=3mIViDf91V/7wdT9OiOOQcOXyzDrFjcPIQ7CvQOQax4=; b=xC4p874O4wJjYS5yL5miS3VUE M+vzpcNoI53fZKBUJ05iseJFnfG5kueKZlLX+qmFXwoa40ptzeLCLuGamHyG25LaWm7ru4G6SxHf5 EOAUBA6M+GMANbuzeaKu9fapgK+p8ZBpXJsX1DK8xbkaOWgCo1fG+/ZC0b+zDRbullXl8=; Received: from mail-io0-f180.google.com ([209.85.223.180]:49791) by s2.fcomet.com with esmtpsa (TLSv1.2:ECDHE-RSA-AES128-GCM-SHA256:128) (Exim 4.89) (envelope-from ) id 1e3Kqg-0007Ay-8l for dev@airflow.incubator.apache.org; Sat, 14 Oct 2017 06:43:22 -0500 Received: by mail-io0-f180.google.com with SMTP id n137so11530494iod.6 for ; Sat, 14 Oct 2017 04:43:22 -0700 (PDT) X-Gm-Message-State: AMCzsaU9IM8bw7iEVZWO3N0Umb9dO7pXa4Cd3qwE13cD664kPIXqE10y DMkK6hE3J4cZmTWkfMvAukSfZYAfcE+06oRslSQ= X-Google-Smtp-Source: AOwi7QBxDzCPD2SMTFp9aEGvACiSVl7vVvGy+XPtp7rZVJcA0kRjXG1z3pdpqa5XOH8qip8JTger0LSiAH+fZQDUm3A= X-Received: by 10.107.132.167 with SMTP id o39mr5729940ioi.243.1507981402367; Sat, 14 Oct 2017 04:43:22 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.156.143 with HTTP; Sat, 14 Oct 2017 04:42:41 -0700 (PDT) In-Reply-To: References: From: Boris Tyukin Date: Sat, 14 Oct 2017 07:42:41 -0400 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Return results optionally from spark_sql_hook To: dev@airflow.incubator.apache.org Content-Type: multipart/alternative; boundary="001a113f261cd0edbf055b80478b" X-AuthUser: boris@boristyukin.com X-Originating-IP: 99.198.101.250 X-SpamExperts-Domain: s2.fcomet.com X-SpamExperts-Username: 99.198.101.250 Authentication-Results: antispamcloud.com; auth=pass smtp.auth=99.198.101.250@s2.fcomet.com X-SpamExperts-Outgoing-Class: ham X-SpamExperts-Outgoing-Evidence: Combined (0.05) X-Recommended-Action: accept X-Filter-ID: EX5BVjFpneJeBchSMxfU5vKlUpqpEaydx3tR6pd9BcsXv9krsgRhBn0ayn6qsUc7GbvcxlRfZESu UA1u+s9J68naUHVLmZ6Qgr+J+PEYw9Cpykfs1N7dMHudHDnngAe5C0DIQaBETJEM7OW6i7E02u2V DTa1LbOq+BTTCTVz9bQ+En43XVuH2G8cUvipBfnLbCOwjnaADaujCEe8x+quBMaiaEf0Cv/ujxDH /mELUKorxniQOlBXgf7IJs8aCO9Akcz69ZsNTrfkyuEykfhH/P8mClKRfKKjw4sonXEgRtXDAjpI GAV+fK6po5Hhw20uKPPoFq/rHekDRRhkO/BQ++dZtPb8KugKyZG0l04EwWgTbcYkpO/ekxzYOGfz /O1prhgqAscZaE8gSsabx3HNFNezYqxGMqsKjARq8PBC4qgCQUI7VIIN79cEJgBd8LzvmdySlZou 9qHIGOZDEEo7O2yocKq2qakhpObdfdFAcKk15ATP1CTkmQq2Sp5oYA0Oa1Ny6S++Hnt1tQ790xcs va30wOlMXpxTSzeVSsgv1h8WD2KOkJw92klMBchOFYaUFm0ME6jXMkS4ovz8J2RLXyQxkWQqTbxj UiN+Lih71Hq1V3cPi2nYWmcEa5mqyAVIQL8P/z68mfPdEJyXXow5i1DcosUbUDH0Ez0hEoHUqnlT m5wFqb7Y79XGD/lY+AXrIGS8uezem7c+8NHP0VEE1JF0wQVoJ5PjxLGlijZhfTCKKJfF9Pgb8xkU X8Ufn38o36eDwEv2+RVOUhOOpbi9MvI3vQNCltt6j5QtKW3a9iqQ2iyGI1cgbdBzUB1Eb9z+HzwX PQxXtoYAYX4kBlLqFLmrH1WR+xd1jX4KzdSON2aZ3JKVmi72ocgY5kMQSjs7WqLbaI19vmlTsGGE iInie7VchSb9qR37m7ke/n5KY4ycIA9nWuJMoxLUn/yzif+v X-Report-Abuse-To: spam@quarantine1.antispamcloud.com archived-at: Sat, 14 Oct 2017 11:44:20 -0000 --001a113f261cd0edbf055b80478b Content-Type: text/plain; charset="UTF-8" Hi Fokko, thanks for your response, really appreciate it! Basically in my case I have two Spark SQL queries: 1) the first query does INSERT OVERWRITE to a partition and may take a while for a while 2) then I run a second query right after it to get count of rows of that partition. 3) I need to pass that count back to airflow dag and this count will be used by the next task in the DAG to make a decision if this partition should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a production table partition. So I need somehow to get that count of rows. My initial though was to parse the log and extract that count but looks like even if i do regex it does not quite work - spark sql writes query output to stdout which airflow spark sql hook does not capture right now. if you can suggest a better solution for me it would be great! Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE PARTITION in the same pyspark job but I found out that spark does not support this statement yet and I have to use Hive. On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko wrote: > Hi Boris, > > Thank you for your question and excuse me for the late response, currently > I'm on holiday. > > The solution that you suggest, would not be my preferred choice. Extracting > results from a log using a regex is expensive in terms of computational > costs, and error prone. My question is, what are you trying to accomplish? > For me there are two ways of using the Spark-sql operator: > > 1. ETL Using Spark: Instead of returning the results, write the results > back to a new table, or a new partition within the table. This data can > be > used downstream in the dag. Also, this will write the data to hdfs > which is > nice for persistance. > 2. Write the data in a simple and widely supported format (such as csv) > onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to > you > local file-system. Or use `hdfs dfs -cat ... | application.py` to pipe > it > to your application directly. > > What you are trying to accomplish, looks for me something that would fit > the spark-submit job, where you can submit pyspark applications where you > can directly fetch the results from Spark: > > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 > /_/ > > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33) > SparkSession available as 'spark'. > >>> spark.sql("SELECT 1 as count").first() > Row(count=1) > > Most of the time we use the Spark-sql to transform the data, then use sqoop > to get the data from hdfs to a rdbms to expose the data to the business. > These examples are for Spark using hdfs, but for s3 it is somewhat the > same. > > Does this answer your question, if not, could you elaborate the problem > that you are facing? > > Ciao, Fokko > > > > > 2017-10-13 15:54 GMT+02:00 Boris : > > > hi guys, > > > > I opened JIRA on this and will be working on PR > > https://issues.apache.org/jira/browse/AIRFLOW-1713 > > > > any objections/suggestions conceptually? > > > > Fokko, I see you have been actively contributing to spark hooks and > > operators so I could use your opinion before I implement this. > > > > Boris > > > --001a113f261cd0edbf055b80478b--