airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fokko Driesprong (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (AIRFLOW-1270) redshift_to_s3_operator timestamp precision loss
Date Mon, 16 Apr 2018 08:24:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-1270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Fokko Driesprong closed AIRFLOW-1270.
-------------------------------------
       Resolution: Fixed
    Fix Version/s: 2.0.0

> redshift_to_s3_operator timestamp precision loss
> ------------------------------------------------
>
>                 Key: AIRFLOW-1270
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1270
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Thomas H
>            Priority: Major
>             Fix For: 2.0.0
>
>
> This operator casts all values as text (https://github.com/apache/incubator-airflow/blob/master/airflow/operators/redshift_to_s3_operator.py#L90)
before issuing the unload command.
> The problem with this is that timestamps lose precision
> i.e. 2016-07-07 06:36:11.835 becomes 2016-07-07 06:36:11 which is a big issue for me.
> One solution would be to remove the logic to cast all values to text, though this would
mean you can't apply the UNION ALL to add headers.
> I personally don't care about the headers so I removed them. My execute method is simply:
> {code}
>     def execute(self, context):
>         self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
>         self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
>         a_key, s_key = self.s3.get_credentials()
>         unload_options = ('\n\t\t\t').join(self.unload_options)
>         unload_query = """
>                         UNLOAD ('SELECT * FROM {0}.{1}')
>                         TO 's3://{2}/{3}/{1}_'
>                         with
>                         credentials 'aws_access_key_id={4};aws_secret_access_key={5}'
>                         {6};
>                         """.format(self.schema, self.table,
>                                 self.s3_bucket, self.s3_key, a_key, s_key, unload_options)
>         logging.info('Executing UNLOAD command...')
>         self.hook.run(unload_query, self.autocommit)
>         logging.info("UNLOAD command complete...")
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message