From dev-return-6682-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Thu Sep 27 15:55:00 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EC69A180652 for ; Thu, 27 Sep 2018 15:54:59 +0200 (CEST) Received: (qmail 33488 invoked by uid 500); 27 Sep 2018 13:54:58 -0000 Mailing-List: contact dev-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list dev@airflow.incubator.apache.org Received: (qmail 33465 invoked by uid 99); 27 Sep 2018 13:54:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Sep 2018 13:54:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AFF44C1C07 for ; Thu, 27 Sep 2018 13:54:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.991 X-Spam-Level: * X-Spam-Status: No, score=1.991 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, KAM_SHORT=0.001, RCVD_IN_DNSWL_NONE=-0.0001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=mikeghen-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id TP0fsSy-zs6P for ; Thu, 27 Sep 2018 13:54:54 +0000 (UTC) Received: from mail-pf1-f173.google.com (mail-pf1-f173.google.com [209.85.210.173]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id B549E5F300 for ; Thu, 27 Sep 2018 13:54:53 +0000 (UTC) Received: by mail-pf1-f173.google.com with SMTP id x17-v6so1985995pfh.5 for ; Thu, 27 Sep 2018 06:54:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=mikeghen-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=pAfedbqB6H/W1qEcOFwdk4qRPE36lcYJKu9wn+Re4sw=; b=a7e/BassILJdAKycEjkkR75XvEFyb4ViHEk+lwU1VOM0iU81cR06DDl5N9qRQRAV49 YhKnchkLNlcz/s7xdkxDzC8AyTfgIIQoWuOcvcBgN4EQG2JtZY0382Y4hyx44JsYSLB7 MdnFFZo3Ql1XtD/YZWSHuk+Bt0QkwM9Yp8mbIRDkKW/YdGgl44ucK563zs51DzCIyaG3 8a7sFlRABJO0KIF1OgK8y+10FdBmaV4D4A7cHecQ1EVaK3WN4QMFj75xsUWldJwQaEKl mbQiknqSTzfe79eSaLW4CSn6UeLs2L0+Ova6HmbakOhvQN+lO4Peeqa4h5DNiSefSa6b aADA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=pAfedbqB6H/W1qEcOFwdk4qRPE36lcYJKu9wn+Re4sw=; b=sb8dyx/JMu6TZ9TyNcgfkVmIdjiLZmHPzMvXyD5BbiI9Xe9MV2W8KJ+cZyX720PBtU 6ME1uOKOR0ZI+hq3591ZzqYjLjItorYgD2Xwel2lPpsolSvGz72Srddka19FhDEGSJLK 1pLTqBgnh7hW6nYuWiFZZa5lAYAkFK0O/vK3Y313UChv2dRjZ5owkOIG15H7+CePxA+f Exun8VoRwO8zuTa5v7rAJh7DR5VdJ4ZzLWY5K3rN8GhX3RrdPb66rIkSDuVV47GJ+EO1 FxdizGjF+dlSNBniLAnHmVPkYWXAmRGNidyynvRLxzstwXM1J8JLiOphMQFti5qSAtdZ 8rYA== X-Gm-Message-State: ABuFfoheJMbCw2WNQNGx0wA550hH3g16FRFpp20R4KUPm+ENmb4XUaqe GnMNV4OiM8MwEadLyj5LaG928vX78yqIuw62lHrCoYki6xpFFg== X-Google-Smtp-Source: ACcGV62yoHmRnKunlvzITnOLw2gId8YSpqofEgMefPpiimIRGsGMoaWvWb+PT/a5eNQSIKXg6eK3q/d887b30Sh+irw= X-Received: by 2002:a63:1342:: with SMTP id 2-v6mr10468033pgt.19.1538056485652; Thu, 27 Sep 2018 06:54:45 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Michael Ghen Date: Thu, 27 Sep 2018 09:54:34 -0400 Message-ID: Subject: Re: hooks & operators improvement proposal To: dev@airflow.incubator.apache.org Content-Type: multipart/alternative; boundary="00000000000078db260576daae31" --00000000000078db260576daae31 Content-Type: text/plain; charset="UTF-8" I see what your looking for and I think this is the purpose of XCom. We've used xcom in some of our custom operators to get this type of functionality. Though, we tend to avoid putting a lot of data into xcom, I believe somewhere in the docs it talks about how that's an anti pattern. The pattern was to lean on external systems for exchanging data. On Wed, Sep 26, 2018 at 4:26 PM Jeff Payne wrote: > Ah, OK. Thanks for the clarification. > > Get Outlook for Android > > ________________________________ > From: Daniel Cohen > Sent: Wednesday, September 26, 2018 1:15:16 PM > To: dev@airflow.incubator.apache.org > Subject: Re: hooks & operators improvement proposal > > Hi Jeff, > seems that I was a bit unclear > The DAG ETL spans across multiple tasks. and usually looks like kickoff >> > source_to_staging >> staging_to_warehouse >> warehouse_post_process. > I'm not proposing changes to operators they are gr8 , what i am proposing > is to borrow the same concept to the smaller building blocks. > > I argue that the task anatomy (in ETL flows) is usually comprised of > 'mini' flows that usually looks like read source > serialize > dump > (example > 1 > < > https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124 > > > , example 2 > < > https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201 > >) > . you can see that sometimes its written in the operator and sometimes in > the hook , the code is not shared and handles same cases each time. > > thanks, > d > > > > On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne wrote: > > > So, in your scenario, the ETL pipeline happens inside the single > > operator/task? > > > > If so, would it not make sense for the pipeline to span multiple tasks > and > > provide a standard set of functions/decorators/etc for defining the > > input/output to/from each task? That way you would leverage the ability > to > > rerun the DAG from a particular step of the ETL pipeline in case of a > > recoverable failure. Or am I misunderstanding... > > > > Get Outlook for Android > > > > ________________________________ > > From: Daniel Cohen > > Sent: Wednesday, September 26, 2018 12:27:29 PM > > To: dev@airflow.apache.org > > Subject: hooks & operators improvement proposal > > > > Some thoughts about operators / hooks: > > Operators are composable, typical ETL flow looks like `kickoff >> > > source_to_staging >> staging_to_warehouse >> warehouse_post_process` > where > > tasks use shared state (like s3) or naming conventions to continue work > > where upstream task left off. > > > > hooks on the other hand are not composable and a lot of ETL logic is > > written ad hoc in the operator each time. > > > > i propose a lightweight, in process, ETL framework that allows > > - hook composition > > - shared general utilities (compression / file management / > serialization) > > - simplifies operator building > > > > how it looks from the operator's side > > def execute(self, context): > > # initialize hooks > > self.s3 = S3Hook... > > self.mysql = MySqlHook... > > > > # setup operator state > > query = 'select * from somewhere' > > > > # declare your ETL process > > self.mysql.yield_query(query) >> \ > > pipes.clear_keys(keys=self.scrubbed_columns) >> \ > > pipes.ndjson_dumps() >> \ > > pipes.batch(size=1024) >> \ > > pipes.gzip() >> \ > > pipes.tempfile() >> \ > > self.s3.file_writer(s3_key=self.s3_key, > > bucket_name=self.s3_bucket, > > replace=True) > > > > > > how it looks from the hook's side > > > > @pipes.producer # decorate > > def yield_query(self, query): > > cursor.execute(query) > > for row in cursor: > > yield row > > > > > > *pipes is a module with a set of operations that are generic and > > potentially reused between hooks / operators > > > > the idea inspired by 'bonobo' and 'python-pipes' (lightwait etl > packsges) > > and implementation based on on generators and decorators. > > > > we (cloudinary.com) are planning to open source it , is it something > that > > would be interesting to integrate into airflow ,or as a 3rd party ? or > not > > at all ? any thoughts suggestions ? > > > > thanks , > > d > > > > > > -- > > daniel cohen > > +972-(0)54-4799-147 > > > > > -- > daniel cohen > +972-(0)54-4799-147 > --00000000000078db260576daae31--