Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CDC4C200D56 for ; Tue, 12 Dec 2017 10:41:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CC347160C0F; Tue, 12 Dec 2017 09:41:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1CB13160BE7 for ; Tue, 12 Dec 2017 10:41:00 +0100 (CET) Received: (qmail 80971 invoked by uid 500); 12 Dec 2017 09:41:00 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 80962 invoked by uid 99); 12 Dec 2017 09:41:00 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Dec 2017 09:41:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D337FC3634 for ; Tue, 12 Dec 2017 09:40:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.231 X-Spam-Level: X-Spam-Status: No, score=-4.231 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id LSZAKPG__ND3 for ; Tue, 12 Dec 2017 09:40:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 417CF5F216 for ; Tue, 12 Dec 2017 09:40:56 +0000 (UTC) Received: (qmail 80953 invoked by uid 99); 12 Dec 2017 09:40:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Dec 2017 09:40:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 504DFDFBAA; Tue, 12 Dec 2017 09:40:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fokko@apache.org To: commits@airflow.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-1907] Pass max_ingestion_time to Druid hook Date: Tue, 12 Dec 2017 09:40:52 +0000 (UTC) archived-at: Tue, 12 Dec 2017 09:41:02 -0000 Repository: incubator-airflow Updated Branches: refs/heads/master c70d8f59c -> 683a27f2c [AIRFLOW-1907] Pass max_ingestion_time to Druid hook >From the Druid operator we want to pass the max_ingestion_time to the hook since some jobs might take considerably more time than the others By default we dont want to set a max ingestion time. Closes #2866 from Fokko/AIRFLOW-1907-pass-max- ingestion-time Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/683a27f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/683a27f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/683a27f2 Branch: refs/heads/master Commit: 683a27f2c16e036b42226cb9d96012d0616d0aa0 Parents: c70d8f5 Author: Fokko Driesprong Authored: Tue Dec 12 10:40:46 2017 +0100 Committer: Fokko Driesprong Committed: Tue Dec 12 10:40:46 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/druid_operator.py | 8 ++++++-- airflow/hooks/druid_hook.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/contrib/operators/druid_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py index 6978cc3..965dc50 100644 --- a/airflow/contrib/operators/druid_operator.py +++ b/airflow/contrib/operators/druid_operator.py @@ -34,10 +34,11 @@ class DruidOperator(BaseOperator): self, json_index_file, druid_ingest_conn_id='druid_ingest_default', + max_ingestion_time=None, *args, **kwargs): - super(DruidOperator, self).__init__(*args, **kwargs) self.conn_id = druid_ingest_conn_id + self.max_ingestion_time = max_ingestion_time with open(json_index_file) as data_file: index_spec = json.load(data_file) @@ -49,6 +50,9 @@ class DruidOperator(BaseOperator): ) def execute(self, context): - hook = DruidHook(druid_ingest_conn_id=self.conn_id) + hook = DruidHook( + druid_ingest_conn_id=self.conn_id, + max_ingestion_time=self.max_ingestion_time + ) self.log.info("Sumitting %s", self.index_spec_str) hook.submit_indexing_job(self.index_spec_str) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 655f666..9ce1f9a 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -36,7 +36,7 @@ class DruidHook(BaseHook): self, druid_ingest_conn_id='druid_ingest_default', timeout=1, - max_ingestion_time=18000): + max_ingestion_time=None): self.druid_ingest_conn_id = druid_ingest_conn_id self.timeout = timeout @@ -72,7 +72,7 @@ class DruidHook(BaseHook): sec = sec + 1 - if sec > self.max_ingestion_time: + if self.max_ingestion_time and sec > self.max_ingestion_time: # ensure that the job gets killed if the max ingestion time is exceeded requests.post("{0}/{1}/shutdown".format(url, druid_task_id)) raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time)