From dev-return-6146-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Mon Aug 13 10:25:14 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D26FF180629 for ; Mon, 13 Aug 2018 10:25:13 +0200 (CEST) Received: (qmail 32956 invoked by uid 500); 13 Aug 2018 08:25:07 -0000 Mailing-List: contact dev-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list dev@airflow.incubator.apache.org Received: (qmail 32896 invoked by uid 99); 13 Aug 2018 08:25:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Aug 2018 08:25:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D0607180554 for ; Mon, 13 Aug 2018 08:25:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.97 X-Spam-Level: * X-Spam-Status: No, score=1.97 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=goshift-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id SiJ6YdlYuYu6 for ; Mon, 13 Aug 2018 08:25:03 +0000 (UTC) Received: from mail-oi0-f50.google.com (mail-oi0-f50.google.com [209.85.218.50]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 856C55F356 for ; Mon, 13 Aug 2018 08:25:03 +0000 (UTC) Received: by mail-oi0-f50.google.com with SMTP id 13-v6so25880912ois.1 for ; Mon, 13 Aug 2018 01:25:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=goshift-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=5DEMjFEcv8YFJlGV2w0eLfhY0CUStidkXTz1lv4y4JY=; b=o8KuaCHJa6+NI5YKEY5UPLpsfeIA9qXi+JNdDFz1JFkgdmW1PSCoyPfYI9xlQymUrl yWkO/tciUI1SZH9rOd9iWqX7O5I51CXYy6TnLWpDHPRd2m/0LEin7ciVkJDFg/qGZvFw JhVHw2iS24/RGd2mSdWNKIk9X6A0mzgUJ+Y1p2JDLbJVIlYZ5R+cPNglIm9Hd3sCCT/Z 13RVdpEijzNzRXTfeVRHWQg03tLLtsE4zKPCaotH3yqH1A0opNImc0dNGspUMuWFigko cyi9Bm21Dpt5g+C8XIcpKFBZk8TrQSpb1bIY5LOiUxGctfZBl+D19UBlAyhmx9oyczLK 5j7g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=5DEMjFEcv8YFJlGV2w0eLfhY0CUStidkXTz1lv4y4JY=; b=mdVSjXyyQclXv6kavZ55oIc4ROubdv2wZ0JasXl4iug1Qj9Nem7F+e9/h8FHA7nsnO 8v3kwyyszgB8wr7Z627RYNRGk2nT7FhoKHI0kbJazOp5NpmiAdf+RjeVMTgmOLCLHY8A q3R+5sgNHdMNpVRGU6VPNClep/tp4Ey9WujLl/izuvZeU6/b6nKM0i6+Hdzba1P1uLpF 4W6aS0eXFTFLiIBn5SiNfXUhR+pGmvUN8tY3kB/Ja5wi9krqqtEz2Aq/xswXHsDhm/B1 Oex7r5ntwUJ6/vMnU0f15qI36p0g7CgmpZzMm0qZS2bVwpIIEUOJK3RFss15K7bXFiRm 5rHw== X-Gm-Message-State: AOUpUlFQ+6TIxl5QCw9ht7750wNq6vdgoQISwJaNB7MP5BGvTJkgyJlm 6FBh1jvrf5Mx92UCJnK2ukyk0DSOM2l+s5VXe3JaYXRj X-Google-Smtp-Source: AA+uWPz7FXW1S8sJS1abz5dIzjRn1tVmKC1vVCe+999ngeA3UMYh5kdPvjq2uNW/s2xRSs371hrtYRlfa5l3hPwTwGU= X-Received: by 2002:aca:bcc1:: with SMTP id m184-v6mr15711923oif.19.1534148702573; Mon, 13 Aug 2018 01:25:02 -0700 (PDT) MIME-Version: 1.0 Received: by 2002:a9d:450b:0:0:0:0:0 with HTTP; Mon, 13 Aug 2018 01:24:32 -0700 (PDT) In-Reply-To: References: From: Robin Edwards Date: Mon, 13 Aug 2018 09:24:32 +0100 Message-ID: Subject: Re: Modeling rate limited api calls in airflow To: dev@airflow.incubator.apache.org Content-Type: multipart/alternative; boundary="00000000000073280805734cd418" --00000000000073280805734cd418 Content-Type: text/plain; charset="UTF-8" Thanks Gerard, that's really helpful it would have taken me some time to pinpoint that race condition. I will go with your suggestion and implement a hook and manage the logic within the operator its self, Rob On Sun, Aug 12, 2018 at 9:28 AM, Gerard Toonstra wrote: > This is worth a design discussion in its own right, but here's my input. > > You're using a DAG with sensor operators to determine if something needs to > be triggered. > There is a time between the sensor "ok-ing" the progression and the dag > being triggered and the > first task being spun up. This interval can easily lead to a race > conditions where another sensor elsewhere still > sees a non-rate limited condition and may also initiate the dag. It's > likely a rate limit will result from that. > > Second, should there still be a rate limit in effect, then the operators in > the DAG won't respect the back-off > period from there, because you passed that check already. > > > For that reason I'd do this slightly differently in a more managed way. I > don't have sufficient background with > the business requirements and how many adwords related work there is in > total, but here are three options to look into: > > - Make the adwords hook, which raises RateLimitException for example, then > let the operator respond to that and > manage redis. FIrst check with redis when it starts, then call adwords > and in case of failure, update redis and probably > go into the retry loop. You can set a low interval here, because it will > check with redis anyway, that way you can support > back off periods of any resolution. > > - Just use a pool with the number of required simultaneous processes and > play with the variables and rates to avoid the > rate limit in the first place. That way, you can maximize the API usage > without creating a stampeding herd that will probably > lead to failure anyway. > > - There's another approach thinkable where a dag "requests" the use of the > API by inserting a record in a queue in redis, > where the main dag does the actual triggering (so that all scheduling is > centralized), but that's like building a scheduler in a > scheduler and in the end, a pool would give you the same functionality > without all the hassle. > > Rgds, > > Gerard > > On Fri, Aug 10, 2018 at 12:41 PM Robin Edwards wrote: > > > Thanks Gerard, > > > > Yea pools look really useful for limiting concurrent requests. > > > > Where you mention the use of a hook would you simply raise an exception > > from get_conn() should the adwords account be rate limited then just > > configure a number of retries and appropriate delay / back off on the > > operator doing work with the api? > > > > I have come up with part of a solution using a key sensor and trigger dag > > run. The idea would be that when my 'adwords_func' encounters a rate > limit > > error it sets a key in redis with an expiry matching the period in the > rate > > limit response then re-triggers the dag which will block on my sensor > until > > the key has expired. > > > > The hard part is now getting this mechanism to work within a sub dag as I > > have multiple api operations that need limiting. > > > > def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries, > > **kwargs): > > dag = DAG(dag_id, **kwargs) > > > > def count_retries(context, obj): > > retries = context['dag_run'].conf.get('dag_retries', 1) > > > > if retries > max_dag_retries: > > raise SystemError("Max retries reached for dag") > > > > obj.payload = {'dag_retries': retries + 1} > > > > return obj > > > > with dag: > > RedisNoKeySensor( > > task_id='check_for_rate_limit', > > key='rate_limited', > > redis_conn_id='redis_master', > > poke_interval=10 > > ) >> PythonOperator( > > task_id=shift_callable.__name__, > > python_callable=adwords_callable, > > ) >> TriggerDagRunOperator( > > task_id='retry_dag_on_failure', > > trigger_dag_id=dag_id, > > trigger_rule=TriggerRule.ONE_FAILED, > > python_callable=count_retries > > ) > > > > return dag > > > > Thanks for your help, > > > > Rob > > > > > > On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra > > wrote: > > > > > Have you looked into pools? Pools allow you to specify how many tasks > at > > > any given time should use a common resource. > > > That way you could limit this to 1, 2, or 3 for example. Pools are not > > > dynamic however, so it only allows you to upper limit how many > > > number of clients are going to hit the API at any moment, not determine > > how > > > many when the rate limit is in effect > > > (unless.... you use code to reconfigure the pool on demand, but I'm not > > > sure if I should recommend that, i.e. reconfigure the # of clients > > > on the basis of hitting the rate limit.) It sounds as if this logic is > > > best introduced at the hook level, where it determines that it passes > > > out an API interface only when the rate limit is not in place, where > > > operators specify how many retries should occur. > > > > > > The Adwords API does allow increasing the rate limit threshold though > and > > > you're probably better off negotiating > > > with Google to up that threshold, explaining your business case etc.? > > > > > > Gerard > > > > > > > > > > > > On Thu, Aug 9, 2018 at 10:43 AM rob@goshift.com > wrote: > > > > > > > Hello, > > > > > > > > I am in the process of migrating a bespoke data pipe line built > around > > > > celery into airflow. > > > > > > > > We have a number of different tasks which interact with the Adwords > API > > > > which has a rate limiting policy. The policy isn't a fixed number of > > > > requests its variable. > > > > > > > > In our celery code we have handled this by capturing a rate limit > error > > > > response and setting a key in redis to make sure that no tasks > execute > > > > against the API until it's expired. Any task that does get executed > > > checks > > > > for the presence of the key and if the key exists issues a retry for > > when > > > > the rate limit is due to expire. > > > > > > > > Moving over to Airflow I can't find a way to go about scheduling a > task > > > to > > > > retry in a specific amount of time. Doing some reading it seems a > > Sensor > > > > could work to prevent other dags from executing whilst the rate limit > > is > > > > present. > > > > > > > > I also can't seem to find an example of handling different exceptions > > > from > > > > a python task and adapting the retry logic accordingly. > > > > > > > > Any pointers would be much appreciated, > > > > > > > > Rob > > > > > > > > > > --00000000000073280805734cd418--