airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas H (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-1270) redshift_to_s3_operator timestamp precision loss
Date Fri, 02 Jun 2017 21:33:04 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-1270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Thomas H updated AIRFLOW-1270:
------------------------------
    Description: 
This operator casts all values as text (https://github.com/apache/incubator-airflow/blob/master/airflow/operators/redshift_to_s3_operator.py#L90)
before issuing the unload command.

The problem with this is that timestamps lose precision
i.e. 2016-07-07 06:36:11.835 becomes 2016-07-07 06:36:11 which is a big issue for me.

One solution would be to remove the logic to cast all values to text, though this would mean
you can't apply the UNION ALL to add headers.

I personally don't care about the headers so I removed them. My execute method is simply:

{code}
    def execute(self, context):
        self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
        a_key, s_key = self.s3.get_credentials()
        unload_options = ('\n\t\t\t').join(self.unload_options)

        unload_query = """
                        UNLOAD ('SELECT * FROM {0}.{1}')
                        TO 's3://{2}/{3}/{1}_'
                        with
                        credentials 'aws_access_key_id={4};aws_secret_access_key={5}'
                        {6};
                        """.format(self.schema, self.table,
                                self.s3_bucket, self.s3_key, a_key, s_key, unload_options)

        logging.info('Executing UNLOAD command...')
        self.hook.run(unload_query, self.autocommit)
        logging.info("UNLOAD command complete...")
{/code}

  was:
This operator casts all values as text (https://github.com/apache/incubator-airflow/blob/master/airflow/operators/redshift_to_s3_operator.py#L90)
before issuing the unload command.

The problem with this is that timestamps lose precision
i.e. 2016-07-07 06:36:11.835 becomes 2016-07-07 06:36:11 which is a big issue for me.

One solution would be to remove the logic to cast all values to text, though this would mean
you can't apply the UNION ALL to add headers.

I personally don't care about the headers so I removed them. My execute method is simply:

```
    def execute(self, context):
        self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
        a_key, s_key = self.s3.get_credentials()
        unload_options = ('\n\t\t\t').join(self.unload_options)

        unload_query = """
                        UNLOAD ('SELECT * FROM {0}.{1}')
                        TO 's3://{2}/{3}/{1}_'
                        with
                        credentials 'aws_access_key_id={4};aws_secret_access_key={5}'
                        {6};
                        """.format(self.schema, self.table,
                                self.s3_bucket, self.s3_key, a_key, s_key, unload_options)

        logging.info('Executing UNLOAD command...')
        self.hook.run(unload_query, self.autocommit)
        logging.info("UNLOAD command complete...")
```


> redshift_to_s3_operator timestamp precision loss
> ------------------------------------------------
>
>                 Key: AIRFLOW-1270
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1270
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Thomas H
>
> This operator casts all values as text (https://github.com/apache/incubator-airflow/blob/master/airflow/operators/redshift_to_s3_operator.py#L90)
before issuing the unload command.
> The problem with this is that timestamps lose precision
> i.e. 2016-07-07 06:36:11.835 becomes 2016-07-07 06:36:11 which is a big issue for me.
> One solution would be to remove the logic to cast all values to text, though this would
mean you can't apply the UNION ALL to add headers.
> I personally don't care about the headers so I removed them. My execute method is simply:
> {code}
>     def execute(self, context):
>         self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
>         self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
>         a_key, s_key = self.s3.get_credentials()
>         unload_options = ('\n\t\t\t').join(self.unload_options)
>         unload_query = """
>                         UNLOAD ('SELECT * FROM {0}.{1}')
>                         TO 's3://{2}/{3}/{1}_'
>                         with
>                         credentials 'aws_access_key_id={4};aws_secret_access_key={5}'
>                         {6};
>                         """.format(self.schema, self.table,
>                                 self.s3_bucket, self.s3_key, a_key, s_key, unload_options)
>         logging.info('Executing UNLOAD command...')
>         self.hook.run(unload_query, self.autocommit)
>         logging.info("UNLOAD command complete...")
> {/code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message