Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9DB1D160C05 for ; Wed, 3 Jan 2018 18:48:31 +0100 (CET) Received: (qmail 80335 invoked by uid 500); 3 Jan 2018 17:48:30 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 80297 invoked by uid 99); 3 Jan 2018 17:48:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jan 2018 17:48:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AF2F9C1955 for ; Wed, 3 Jan 2018 17:48:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.531 X-Spam-Level: X-Spam-Status: No, score=-4.531 tagged_above=-999 required=6.31 tests=[KAM_NUMSUBJECT=0.5, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id njwGbx_gDZef for ; Wed, 3 Jan 2018 17:48:23 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id DB93B5FC3F for ; Wed, 3 Jan 2018 17:48:12 +0000 (UTC) Received: (qmail 76122 invoked by uid 99); 3 Jan 2018 17:48:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jan 2018 17:48:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D372AF217C; Wed, 3 Jan 2018 17:48:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: criccomini@apache.org To: commits@airflow.incubator.apache.org Date: Wed, 03 Jan 2018 17:48:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/35] incubator-airflow-site git commit: 1.9.0 archived-at: Wed, 03 Jan 2018 17:48:33 -0000 http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/airflow/operators/sensors.html ---------------------------------------------------------------------- diff --git a/_modules/airflow/operators/sensors.html b/_modules/airflow/operators/sensors.html index 03643fe..7414daf 100644 --- a/_modules/airflow/operators/sensors.html +++ b/_modules/airflow/operators/sensors.html @@ -13,6 +13,8 @@ + + @@ -80,7 +82,10 @@ -
    + + + +
    • Project
    • License
    • Quick Start
    • @@ -178,23 +183,25 @@ from __future__ import print_function from future import standard_library + +from airflow.utils.log.logging_mixin import LoggingMixin + standard_library.install_aliases() from builtins import str from past.builtins import basestring from datetime import datetime -import logging from urllib.parse import urlparse from time import sleep import re import sys -import airflow -from airflow import hooks, settings +from airflow import settings from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException from airflow.models import BaseOperator, TaskInstance from airflow.hooks.base_hook import BaseHook from airflow.hooks.hdfs_hook import HDFSHook +from airflow.hooks.http_hook import HttpHook from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -223,7 +230,7 @@ timeout=60*60*24*7, soft_fail=False, *args, **kwargs): - super(BaseSensorOperator, self).__init__(*args, **kwargs) + super(BaseSensorOperator, self).__init__(*args, **kwargs) self.poke_interval = poke_interval self.soft_fail = soft_fail self.timeout = timeout @@ -236,20 +243,20 @@ raise AirflowException('Override me.') def execute(self, context): - started_at = datetime.now() + started_at = datetime.utcnow() while not self.poke(context): - if (datetime.now() - started_at).total_seconds() > self.timeout: + if (datetime.utcnow() - started_at).total_seconds() > self.timeout: if self.soft_fail: raise AirflowSkipException('Snap. Time is OUT.') else: raise AirflowSensorTimeout('Snap. Time is OUT.') sleep(self.poke_interval) - logging.info("Success criteria met. Exiting.") + self.log.info("Success criteria met. Exiting.") class SqlSensor(BaseSensorOperator): """ - Runs a sql statement until a criteria is met. It will keep trying until + Runs a sql statement until a criteria is met. It will keep trying while sql returns no row, or if the first cell in (0, '0', ''). :param conn_id: The connection to run the sensor against @@ -265,12 +272,12 @@ def __init__(self, conn_id, sql, *args, **kwargs): self.sql = sql self.conn_id = conn_id - super(SqlSensor, self).__init__(*args, **kwargs) + super(SqlSensor, self).__init__(*args, **kwargs) def poke(self, context): hook = BaseHook.get_connection(self.conn_id).get_hook() - logging.info('Poking: ' + self.sql) + self.log.info('Poking: %s', self.sql) records = hook.get_records(self.sql) if not records: return False @@ -279,7 +286,6 @@ return False else: return True - print(records[0][0]) class MetastorePartitionSensor(SqlSensor): @@ -321,7 +327,7 @@ # The inheritance model needs to be reworked in order to support overriding args/ # kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the # constructor below and apply_defaults will no longer throw an exception. - super(SqlSensor, self).__init__(*args, **kwargs) + super(SqlSensor, self).__init__(*args, **kwargs) def poke(self, context): if self.first_poke: @@ -360,7 +366,7 @@ ExternalTaskSensor, but not both. :type execution_delta: datetime.timedelta :param execution_date_fn: function that receives the current execution date - and returns the desired execution date to query. Either execution_delta + and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_date_fn: callable """ @@ -375,7 +381,7 @@ execution_delta=None, execution_date_fn=None, *args, **kwargs): - super(ExternalTaskSensor, self).__init__(*args, **kwargs) + super(ExternalTaskSensor, self).__init__(*args, **kwargs) self.allowed_states = allowed_states or [State.SUCCESS] if execution_delta is not None and execution_date_fn is not None: raise ValueError( @@ -395,11 +401,15 @@ else: dttm = context['execution_date'] - logging.info( + dttm_filter = dttm if isinstance(dttm, list) else [dttm] + serialized_dttm_filter = ','.join( + [datetime.isoformat() for datetime in dttm_filter]) + + self.log.info( 'Poking for ' '{self.external_dag_id}.' '{self.external_task_id} on ' - '{dttm} ... '.format(**locals())) + '{} ... '.format(serialized_dttm_filter, **locals())) TI = TaskInstance session = settings.Session() @@ -407,11 +417,11 @@ TI.dag_id == self.external_dag_id, TI.task_id == self.external_task_id, TI.state.in_(self.allowed_states), - TI.execution_date == dttm, + TI.execution_date.in_(dttm_filter), ).count() session.commit() session.close() - return count + return count == len(dttm_filter) class NamedHivePartitionSensor(BaseSensorOperator): @@ -442,7 +452,7 @@ poke_interval=60 * 3, *args, **kwargs): - super(NamedHivePartitionSensor, self).__init__( + super(NamedHivePartitionSensor, self).__init__( poke_interval=poke_interval, *args, **kwargs) if isinstance(partition_names, basestring): @@ -462,16 +472,16 @@ raise ValueError('Could not parse ' + partition) def poke(self, context): - if not hasattr(self, 'hook'): - self.hook = hooks.HiveMetastoreHook( + from airflow.hooks.hive_hooks import HiveMetastoreHook + self.hook = HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) def poke_partition(partition): schema, table, partition = self.parse_partition_name(partition) - logging.info( + self.log.info( 'Poking for {schema}.{table}/{partition}'.format(**locals()) ) return self.hook.check_for_named_partition( @@ -507,7 +517,7 @@ :type metastore_conn_id: str """ template_fields = ('schema', 'table', 'partition',) - ui_color = '#2b2d42' + ui_color = '#C5CAE9' @apply_defaults def __init__( @@ -517,7 +527,7 @@ schema='default', poke_interval=60*3, *args, **kwargs): - super(HivePartitionSensor, self).__init__( + super(HivePartitionSensor, self).__init__( poke_interval=poke_interval, *args, **kwargs) if not partition: partition = "ds='{{ ds }}'" @@ -529,11 +539,12 @@ def poke(self, context): if '.' in self.table: self.schema, self.table = self.table.split('.') - logging.info( + self.log.info( 'Poking for table {self.schema}.{self.table}, ' 'partition {self.partition}'.format(**locals())) if not hasattr(self, 'hook'): - self.hook = hooks.HiveMetastoreHook( + from airflow.hooks.hive_hooks import HiveMetastoreHook + self.hook = HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) return self.hook.check_for_partition( self.schema, self.table, self.partition) @@ -556,7 +567,7 @@ file_size=None, hook=HDFSHook, *args, **kwargs): - super(HdfsSensor, self).__init__(*args, **kwargs) + super(HdfsSensor, self).__init__(*args, **kwargs) self.filepath = filepath self.hdfs_conn_id = hdfs_conn_id self.file_size = file_size @@ -568,52 +579,57 @@ def filter_for_filesize(result, size=None): """ Will test the filepath result and test if its size is at least self.filesize + :param result: a list of dicts returned by Snakebite ls :param size: the file size in MB a file should be at least to trigger True :return: (bool) depending on the matching criteria """ if size: - logging.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result)) + log = LoggingMixin().log + log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result)) size *= settings.MEGABYTE result = [x for x in result if x['length'] >= size] - logging.debug('HdfsSensor.poke: after size filter result is %s', result) + log.debug('HdfsSensor.poke: after size filter result is %s', result) return result @staticmethod def filter_for_ignored_ext(result, ignored_ext, ignore_copying): """ Will filter if instructed to do so the result to remove matching criteria + :param result: (list) of dicts returned by Snakebite ls - :param ignored_ext: (list) of ignored extentions + :param ignored_ext: (list) of ignored extensions :param ignore_copying: (bool) shall we ignore ? - :return: + :return: (list) of dicts which were not removed """ if ignore_copying: + log = LoggingMixin().log regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext) ignored_extentions_regex = re.compile(regex_builder) - logging.debug('Filtering result for ignored extentions: %s in files %s', ignored_extentions_regex.pattern, - map(lambda x: x['path'], result)) + log.debug( + 'Filtering result for ignored extensions: %s in files %s', + ignored_extentions_regex.pattern, map(lambda x: x['path'], result) + ) result = [x for x in result if not ignored_extentions_regex.match(x['path'])] - logging.debug('HdfsSensor.poke: after ext filter result is %s', result) + log.debug('HdfsSensor.poke: after ext filter result is %s', result) return result def poke(self, context): sb = self.hook(self.hdfs_conn_id).get_conn() - logging.getLogger("snakebite").setLevel(logging.WARNING) - logging.info('Poking for file {self.filepath} '.format(**locals())) + self.log.info('Poking for file {self.filepath}'.format(**locals())) try: # IMOO it's not right here, as there no raise of any kind. # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*', # it's not correct as the directory exists and sb does not raise any error # here is a quick fix result = [f for f in sb.ls([self.filepath], include_toplevel=False)] - logging.debug('HdfsSensor.poke: result is %s', result) + self.log.debug('HdfsSensor.poke: result is %s', result) result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) result = self.filter_for_filesize(result, self.file_size) return bool(result) except: e = sys.exc_info() - logging.debug("Caught an exception !: %s", str(e)) + self.log.debug("Caught an exception !: %s", str(e)) return False @@ -629,14 +645,14 @@ filepath, webhdfs_conn_id='webhdfs_default', *args, **kwargs): - super(WebHdfsSensor, self).__init__(*args, **kwargs) + super(WebHdfsSensor, self).__init__(*args, **kwargs) self.filepath = filepath self.webhdfs_conn_id = webhdfs_conn_id def poke(self, context): - c = airflow.hooks.webhdfs_hook.WebHDFSHook(self.webhdfs_conn_id) - logging.info( - 'Poking for file {self.filepath} '.format(**locals())) + from airflow.hooks.webhdfs_hook import WebHDFSHook + c = WebHDFSHook(self.webhdfs_conn_id) + self.log.info('Poking for file {self.filepath}'.format(**locals())) return c.check_for_path(hdfs_path=self.filepath) @@ -654,8 +670,8 @@ :param wildcard_match: whether the bucket_key should be interpreted as a Unix wildcard pattern :type wildcard_match: bool - :param s3_conn_id: a reference to the s3 connection - :type s3_conn_id: str + :param aws_conn_id: a reference to the s3 connection + :type aws_conn_id: str """ template_fields = ('bucket_key', 'bucket_name') @@ -664,9 +680,9 @@ self, bucket_key, bucket_name=None, wildcard_match=False, - s3_conn_id='s3_default', + aws_conn_id='aws_default', *args, **kwargs): - super(S3KeySensor, self).__init__(*args, **kwargs) + super(S3KeySensor, self).__init__(*args, **kwargs) # Parse if bucket_name is None: parsed_url = urlparse(bucket_key) @@ -681,13 +697,13 @@ self.bucket_name = bucket_name self.bucket_key = bucket_key self.wildcard_match = wildcard_match - self.s3_conn_id = s3_conn_id + self.aws_conn_id = aws_conn_id def poke(self, context): - import airflow.hooks.S3_hook - hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) + from airflow.hooks.S3_hook import S3Hook + hook = S3Hook(aws_conn_id=self.aws_conn_id) full_url = "s3://" + self.bucket_name + "/" + self.bucket_key - logging.info('Poking for key : {full_url}'.format(**locals())) + self.log.info('Poking for key : {full_url}'.format(**locals())) if self.wildcard_match: return hook.check_for_wildcard_key(self.bucket_key, self.bucket_name) @@ -718,21 +734,21 @@ def __init__( self, bucket_name, prefix, delimiter='/', - s3_conn_id='s3_default', + aws_conn_id='aws_default', *args, **kwargs): - super(S3PrefixSensor, self).__init__(*args, **kwargs) + super(S3PrefixSensor, self).__init__(*args, **kwargs) # Parse self.bucket_name = bucket_name self.prefix = prefix self.delimiter = delimiter self.full_url = "s3://" + bucket_name + '/' + prefix - self.s3_conn_id = s3_conn_id + self.aws_conn_id = aws_conn_id def poke(self, context): - logging.info('Poking for prefix : {self.prefix}\n' - 'in bucket s3://{self.bucket_name}'.format(**locals())) - import airflow.hooks.S3_hook - hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) + self.log.info('Poking for prefix : {self.prefix}\n' + 'in bucket s3://{self.bucket_name}'.format(**locals())) + from airflow.hooks.S3_hook import S3Hook + hook = S3Hook(aws_conn_id=self.aws_conn_id) return hook.check_for_prefix( prefix=self.prefix, delimiter=self.delimiter, @@ -750,13 +766,12 @@ @apply_defaults def __init__(self, target_time, *args, **kwargs): - super(TimeSensor, self).__init__(*args, **kwargs) + super(TimeSensor, self).__init__(*args, **kwargs) self.target_time = target_time def poke(self, context): - logging.info( - 'Checking if the time ({0}) has come'.format(self.target_time)) - return datetime.now().time() > self.target_time + self.log.info('Checking if the time (%s) has come', self.target_time) + return datetime.utcnow().time() > self.target_time class TimeDeltaSensor(BaseSensorOperator): @@ -773,15 +788,15 @@ @apply_defaults def __init__(self, delta, *args, **kwargs): - super(TimeDeltaSensor, self).__init__(*args, **kwargs) + super(TimeDeltaSensor, self).__init__(*args, **kwargs) self.delta = delta def poke(self, context): dag = context['dag'] target_dttm = dag.following_schedule(context['execution_date']) target_dttm += self.delta - logging.info('Checking if the time ({0}) has come'.format(target_dttm)) - return datetime.now() > target_dttm + self.log.info('Checking if the time (%s) has come', target_dttm) + return datetime.utcnow() > target_dttm class HttpSensor(BaseSensorOperator): @@ -791,10 +806,12 @@ :param http_conn_id: The connection to run the sensor against :type http_conn_id: string + :param method: The HTTP request method to use + :type method: string :param endpoint: The relative part of the full url :type endpoint: string - :param params: The parameters to be added to the GET url - :type params: a dictionary of string key/value pairs + :param request_params: The parameters to be added to the GET url + :type request_params: a dictionary of string key/value pairs :param headers: The HTTP headers to be added to the GET request :type headers: a dictionary of string key/value pairs :param response_check: A check against the 'requests' response object. @@ -806,31 +823,34 @@ depends on the option that's being modified. """ - template_fields = ('endpoint',) + template_fields = ('endpoint', 'request_params') @apply_defaults def __init__(self, endpoint, http_conn_id='http_default', - params=None, + method='GET', + request_params=None, headers=None, response_check=None, extra_options=None, *args, **kwargs): - super(HttpSensor, self).__init__(*args, **kwargs) + super(HttpSensor, self).__init__(*args, **kwargs) self.endpoint = endpoint self.http_conn_id = http_conn_id - self.params = params or {} + self.request_params = request_params or {} self.headers = headers or {} self.extra_options = extra_options or {} self.response_check = response_check - self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id) + self.hook = HttpHook( + method=method, + http_conn_id=http_conn_id) def poke(self, context): - logging.info('Poking: ' + self.endpoint) + self.log.info('Poking: %s', self.endpoint) try: response = self.hook.run(self.endpoint, - data=self.params, + data=self.request_params, headers=self.headers, extra_options=self.extra_options) if self.response_check: